Skip to content

Commit

Permalink
Archival: Add failover version check when uploading history (cadence-…
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jun 5, 2019
1 parent 7b2e335 commit e182b10
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 3 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,7 @@ const (
IndexProcessorCorruptedData
ArchiverNonRetryableErrorCount
ArchiverSkipUploadCount
ArchiverHistoryMutatedCount
ArchiverRunningDeterministicConstructionCheckCount
ArchiverDeterministicConstructionCheckFailedCount
ArchiverCouldNotRunDeterministicConstructionCheckCount
Expand Down Expand Up @@ -1573,6 +1574,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
IndexProcessorCorruptedData: {metricName: "index_processor_corrupted_data"},
ArchiverNonRetryableErrorCount: {metricName: "archiver_non_retryable_error"},
ArchiverSkipUploadCount: {metricName: "archiver_skip_upload"},
ArchiverHistoryMutatedCount: {metricName: "archiver_history_mutated"},
ArchiverRunningDeterministicConstructionCheckCount: {metricName: "archiver_running_deterministic_construction_check"},
ArchiverDeterministicConstructionCheckFailedCount: {metricName: "archiver_deterministic_construction_check_failed"},
ArchiverCouldNotRunDeterministicConstructionCheckCount: {metricName: "archiver_could_not_run_deterministic_construction_check"},
Expand Down
11 changes: 10 additions & 1 deletion service/worker/archiver/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ const (

errDeleteHistoryV1 = "failed to delete history from events_v1"
errDeleteHistoryV2 = "failed to delete history from events_v2"

errHistoryMutated = "history was mutated during uploading"
)

var (
uploadHistoryActivityNonRetryableErrors = []string{errGetDomainByID, errConstructKey, errGetTags, errUploadBlob, errReadBlob, errEmptyBucket, errConstructBlob, errDownloadBlob}
uploadHistoryActivityNonRetryableErrors = []string{errGetDomainByID, errConstructKey, errGetTags, errUploadBlob, errReadBlob, errEmptyBucket, errConstructBlob, errDownloadBlob, errHistoryMutated}
deleteBlobActivityNonRetryableErrors = []string{errConstructKey, errGetTags, errUploadBlob, errEmptyBucket, errDeleteBlob}
deleteHistoryActivityNonRetryableErrors = []string{errDeleteHistoryV1, errDeleteHistoryV2}
errContextTimeout = errors.New("activity aborted because context timed out")
Expand Down Expand Up @@ -153,6 +155,13 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
logger.Error(uploadErrorMsg, tag.UploadFailReason("could not get history blob from reader"))
return err
}

if historyMutated(historyBlob, &request) {
scope.IncCounter(metrics.ArchiverHistoryMutatedCount)
logger.Error(uploadErrorMsg, tag.UploadFailReason("history was mutated during archiving"))
return cadence.NewCustomError(errHistoryMutated)
}

if runConstTest {
// some tags are specific to the cluster and time a blob was uploaded from/when
// this only updates those specific tags, all other parts of the blob are left unchanged
Expand Down
51 changes: 49 additions & 2 deletions service/worker/archiver/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,9 @@ func (s *activitiesSuite) TestUploadHistoryActivity_Success_BlobDoesNotAlreadyEx
mockHistoryBlobReader := &HistoryBlobReaderMock{}
mockHistoryBlobReader.On("GetBlob", mock.Anything).Return(&HistoryBlob{
Header: &HistoryBlobHeader{
IsLast: common.BoolPtr(true),
LastFailoverVersion: common.Int64Ptr(testCloseFailoverVersion),
LastEventID: common.Int64Ptr(testNextEventID - 1),
IsLast: common.BoolPtr(true),
},
}, nil)
container := &BootstrapContainer{
Expand Down Expand Up @@ -670,7 +672,9 @@ func (s *activitiesSuite) TestUploadHistoryActivity_Success_ConcurrentUploads()
mockHistoryBlobReader := &HistoryBlobReaderMock{}
mockHistoryBlobReader.On("GetBlob", common.FirstBlobPageToken+1).Return(&HistoryBlob{
Header: &HistoryBlobHeader{
IsLast: common.BoolPtr(true),
LastFailoverVersion: common.Int64Ptr(testCloseFailoverVersion),
LastEventID: common.Int64Ptr(testNextEventID - 1),
IsLast: common.BoolPtr(true),
},
}, nil)
container := &BootstrapContainer{
Expand Down Expand Up @@ -700,6 +704,49 @@ func (s *activitiesSuite) TestUploadHistoryActivity_Success_ConcurrentUploads()
s.NoError(err)
}

func (s *activitiesSuite) TestUploadHistoryActivity_Fail_HistoryMutated() {
s.metricsClient.On("Scope", metrics.ArchiverUploadHistoryActivityScope, []metrics.Tag{metrics.DomainTag(testDomainName)}).Return(s.metricsScope).Once()
s.metricsScope.On("IncCounter", metrics.ArchiverNonRetryableErrorCount).Once()
s.metricsScope.On("IncCounter", metrics.ArchiverHistoryMutatedCount).Once()
firstKey, _ := NewHistoryBlobKey(testDomainID, testWorkflowID, testRunID, testCloseFailoverVersion, common.FirstBlobPageToken)
domainCache, mockClusterMetadata := s.archivalConfig(true, testArchivalBucket, true)
mockBlobstore := &mocks.BlobstoreClient{}
mockBlobstore.On("GetTags", mock.Anything, mock.Anything, firstKey).Return(nil, blobstore.ErrBlobNotExists).Once()
mockHistoryBlobReader := &HistoryBlobReaderMock{}
// Return a history blob with a larger failover version
mockHistoryBlobReader.On("GetBlob", common.FirstBlobPageToken).Return(&HistoryBlob{
Header: &HistoryBlobHeader{
LastFailoverVersion: common.Int64Ptr(testCloseFailoverVersion + 1),
IsLast: common.BoolPtr(true),
},
}, nil)
container := &BootstrapContainer{
Logger: s.logger,
MetricsClient: s.metricsClient,
DomainCache: domainCache,
ClusterMetadata: mockClusterMetadata,
Blobstore: mockBlobstore,
HistoryBlobReader: mockHistoryBlobReader,
Config: getConfig(false),
}
env := s.NewTestActivityEnvironment()
env.SetWorkerOptions(worker.Options{
BackgroundActivityContext: context.WithValue(context.Background(), bootstrapContainerKey, container),
})
request := ArchiveRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
RunID: testRunID,
BranchToken: testBranchToken,
NextEventID: testNextEventID,
CloseFailoverVersion: testCloseFailoverVersion,
BucketName: testArchivalBucket,
}
_, err := env.ExecuteActivity(uploadHistoryActivity, request)
s.Equal(errHistoryMutated, err.Error())
}

func (s *activitiesSuite) TestDeleteBlobActivity_Fail_ConstructBlobKeyError() {
s.metricsClient.On("Scope", metrics.ArchiverDeleteBlobActivityScope, []metrics.Tag{metrics.DomainTag(testDomainName)}).Return(s.metricsScope).Once()
s.metricsScope.On("IncCounter", metrics.ArchiverNonRetryableErrorCount).Once()
Expand Down
15 changes: 15 additions & 0 deletions service/worker/archiver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/dgryski/go-farm"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"go.uber.org/cadence"
Expand Down Expand Up @@ -96,6 +97,20 @@ func shouldRun(probability float64) bool {
return rand.Intn(int(1.0/probability)) == 0
}

func historyMutated(historyBlob *HistoryBlob, request *ArchiveRequest) bool {
lastFailoverVersion := common.Int64Default(historyBlob.Header.LastFailoverVersion)
if lastFailoverVersion > request.CloseFailoverVersion {
return true
}

if !common.BoolDefault(historyBlob.Header.IsLast) {
return false
}

lastEventID := common.Int64Default(historyBlob.Header.LastEventID)
return lastFailoverVersion != request.CloseFailoverVersion || lastEventID+1 != request.NextEventID
}

func validateArchivalRequest(request *ArchiveRequest) error {
if len(request.BucketName) == 0 {
// this should not be able to occur, if domain enables archival bucket should always be set
Expand Down
64 changes: 64 additions & 0 deletions service/worker/archiver/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber/cadence/common"
"go.uber.org/cadence"
)

Expand Down Expand Up @@ -89,6 +90,69 @@ func (s *UtilSuite) TestHashesEqual() {
}
}

func (s *UtilSuite) TestHistoryMutated() {
testCases := []struct {
historyBlob *HistoryBlob
request *ArchiveRequest
isMutated bool
}{
{
historyBlob: &HistoryBlob{
Header: &HistoryBlobHeader{
LastFailoverVersion: common.Int64Ptr(15),
},
},
request: &ArchiveRequest{
CloseFailoverVersion: 3,
},
isMutated: true,
},
{
historyBlob: &HistoryBlob{
Header: &HistoryBlobHeader{
LastFailoverVersion: common.Int64Ptr(10),
LastEventID: common.Int64Ptr(50),
IsLast: common.BoolPtr(true),
},
},
request: &ArchiveRequest{
CloseFailoverVersion: 10,
NextEventID: 34,
},
isMutated: true,
},
{
historyBlob: &HistoryBlob{
Header: &HistoryBlobHeader{
LastFailoverVersion: common.Int64Ptr(9),
IsLast: common.BoolPtr(true),
},
},
request: &ArchiveRequest{
CloseFailoverVersion: 10,
},
isMutated: true,
},
{
historyBlob: &HistoryBlob{
Header: &HistoryBlobHeader{
LastFailoverVersion: common.Int64Ptr(10),
LastEventID: common.Int64Ptr(33),
IsLast: common.BoolPtr(true),
},
},
request: &ArchiveRequest{
CloseFailoverVersion: 10,
NextEventID: 34,
},
isMutated: false,
},
}
for _, tc := range testCases {
s.Equal(tc.isMutated, historyMutated(tc.historyBlob, tc.request))
}
}

func (s *UtilSuite) TestValidateRequest() {
testCases := []struct {
request *ArchiveRequest
Expand Down

0 comments on commit e182b10

Please sign in to comment.