Skip to content

Commit

Permalink
Archival blob integrity check (cadence-workflow#1981)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Jun 10, 2019
1 parent d793f7e commit 0999c92
Show file tree
Hide file tree
Showing 17 changed files with 815 additions and 194 deletions.
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,3 +716,8 @@ func ArchivalDeterministicConstructionCheckFailReason(deterministicConstructionC
func ArchivalNonDeterministicBlobKey(nondeterministicBlobKey string) Tag {
return newStringTag("archival-non-deterministic-blob-key", nondeterministicBlobKey)
}

// ArchivalBlobIntegrityCheckFailReason returns tag for ArchivalBlobIntegrityCheckFailReason
func ArchivalBlobIntegrityCheckFailReason(blobIntegrityCheckFailReason string) Tag {
return newStringTag("archival-blob-integrity-check-fail-reason", blobIntegrityCheckFailReason)
}
6 changes: 6 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,9 @@ const (
ArchiverRunningDeterministicConstructionCheckCount
ArchiverDeterministicConstructionCheckFailedCount
ArchiverCouldNotRunDeterministicConstructionCheckCount
ArchiverRunningBlobIntegrityCheckCount
ArchiverBlobIntegrityCheckFailedCount
ArchiverCouldNotRunBlobIntegrityCheckCount
ArchiverStartedCount
ArchiverStoppedCount
ArchiverCoroutineStartedCount
Expand Down Expand Up @@ -1582,6 +1585,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ArchiverRunningDeterministicConstructionCheckCount: {metricName: "archiver_running_deterministic_construction_check"},
ArchiverDeterministicConstructionCheckFailedCount: {metricName: "archiver_deterministic_construction_check_failed"},
ArchiverCouldNotRunDeterministicConstructionCheckCount: {metricName: "archiver_could_not_run_deterministic_construction_check"},
ArchiverRunningBlobIntegrityCheckCount: {metricName: "archiver_running_blob_integrity_check"},
ArchiverCouldNotRunBlobIntegrityCheckCount: {metricName: "archiver_could_not_run_blob_integrity_check"},
ArchiverBlobIntegrityCheckFailedCount: {metricName: "archiver_blob_integrity_check_failed"},
ArchiverStartedCount: {metricName: "archiver_started"},
ArchiverStoppedCount: {metricName: "archiver_stopped"},
ArchiverCoroutineStartedCount: {metricName: "archiver_coroutine_started"},
Expand Down
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ var keys = map[Key]string{
WorkerArchiverConcurrency: "worker.ArchiverConcurrency",
WorkerArchivalsPerIteration: "worker.ArchivalsPerIteration",
WorkerDeterministicConstructionCheckProbability: "worker.DeterministicConstructionCheckProbability",
WorkerBlobIntegrityCheckProbability: "worker.BlobIntegrityCheckProbability",
WorkerTimeLimitPerArchivalIteration: "worker.TimeLimitPerArchivalIteration",
WorkerThrottledLogRPS: "worker.throttledLogRPS",
ScannerPersistenceMaxQPS: "worker.scannerPersistenceMaxQPS",
Expand Down Expand Up @@ -501,6 +502,8 @@ const (
WorkerArchivalsPerIteration
// WorkerDeterministicConstructionCheckProbability controls the probability of running a deterministic construction check for any given archival
WorkerDeterministicConstructionCheckProbability
// WorkerBlobIntegrityCheckProbability controls the probability of running an integrity check for any given archival
WorkerBlobIntegrityCheckProbability
// WorkerTimeLimitPerArchivalIteration controls the time limit of each iteration of archival workflow
WorkerTimeLimitPerArchivalIteration
// WorkerThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger
Expand Down
82 changes: 11 additions & 71 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/blobstore"
"github.com/uber/cadence/common/blobstore/blob"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/clock"
Expand Down Expand Up @@ -78,6 +77,7 @@ type (
versionChecker *versionChecker
domainHandler *domainHandlerImpl
visibilityQueryValidator *common.VisibilityQueryValidator
historyBlobDownloader archiver.HistoryBlobDownloader
service.Service
}

Expand All @@ -93,11 +93,6 @@ type (
ReplicationInfo map[string]*gen.ReplicationInfo
}

getHistoryContinuationTokenArchival struct {
BlobstorePageToken int
CloseFailoverVersion int64
}

domainGetter interface {
GetDomain() string
}
Expand Down Expand Up @@ -130,7 +125,6 @@ var (

// err for archival
errDomainHasNeverBeenEnabledForArchival = &gen.BadRequestError{Message: "Attempted to fetch history from archival, but domain has never been enabled for archival."}
errInvalidNextArchivalPageToken = &gen.BadRequestError{Message: "Invalid NextPageToken for archival."}

// err for string too long
errDomainTooLong = &gen.BadRequestError{Message: "Domain length exceeds limit."}
Expand Down Expand Up @@ -177,6 +171,7 @@ func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persi
NewDomainReplicator(kafkaProducer, sVice.GetLogger()),
),
visibilityQueryValidator: common.NewQueryValidator(config.ValidSearchAttributes),
historyBlobDownloader: archiver.NewHistoryBlobDownloader(blobstoreClient),
}
// prevent us from trying to serve requests before handler's Start() is complete
handler.startWG.Add(1)
Expand Down Expand Up @@ -3155,21 +3150,6 @@ func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) {
return bytes, err
}

func deserializeHistoryTokenArchival(bytes []byte) (*getHistoryContinuationTokenArchival, error) {
token := &getHistoryContinuationTokenArchival{}
err := json.Unmarshal(bytes, token)
return token, err
}

func serializeHistoryTokenArchival(token *getHistoryContinuationTokenArchival) ([]byte, error) {
if token == nil {
return nil, nil
}

bytes, err := json.Marshal(token)
return bytes, err
}

func createServiceBusyError() *gen.ServiceBusyError {
err := &gen.ServiceBusyError{}
err.Message = "Too many outstanding requests to the cadence service"
Expand Down Expand Up @@ -3214,60 +3194,20 @@ func (wh *WorkflowHandler) getArchivedHistory(
if archivalBucket == "" {
return nil, wh.error(errDomainHasNeverBeenEnabledForArchival, scope)
}
var token *getHistoryContinuationTokenArchival
if request.NextPageToken != nil {
token, err = deserializeHistoryTokenArchival(request.NextPageToken)
if err != nil {
return nil, wh.error(errInvalidNextArchivalPageToken, scope)
}
} else {
indexKey, err := archiver.NewHistoryIndexBlobKey(domainID, request.Execution.GetWorkflowId(), request.Execution.GetRunId())
if err != nil {
return nil, wh.error(err, scope)
}
indexTags, err := wh.blobstoreClient.GetTags(ctx, archivalBucket, indexKey)
if err != nil {
return nil, wh.error(err, scope)
}
highestVersion, err := archiver.GetHighestVersion(indexTags)
if err != nil {
return nil, wh.error(err, scope)
}
token = &getHistoryContinuationTokenArchival{
BlobstorePageToken: common.FirstBlobPageToken,
CloseFailoverVersion: *highestVersion,
}
}
key, err := archiver.NewHistoryBlobKey(domainID, request.Execution.GetWorkflowId(), request.Execution.GetRunId(), token.CloseFailoverVersion, token.BlobstorePageToken)
if err != nil {
return nil, wh.error(err, scope)
}
b, err := wh.blobstoreClient.Download(ctx, archivalBucket, key)
if err != nil {
return nil, wh.error(err, scope)
}
unwrappedBlob, wrappingLayers, err := blob.Unwrap(b)
if err != nil {
return nil, wh.error(err, scope)
downloadReq := &archiver.DownloadBlobRequest{
NextPageToken: request.NextPageToken,
ArchivalBucket: archivalBucket,
DomainID: domainID,
WorkflowID: request.GetExecution().GetWorkflowId(),
RunID: request.GetExecution().GetRunId(),
}
historyBlob := &archiver.HistoryBlob{}
switch *wrappingLayers.EncodingFormat {
case blob.JSONEncoding:
if err := json.Unmarshal(unwrappedBlob.Body, historyBlob); err != nil {
return nil, wh.error(err, scope)
}
}
token.BlobstorePageToken = *historyBlob.Header.NextPageToken
if *historyBlob.Header.IsLast {
token = nil
}
nextToken, err := serializeHistoryTokenArchival(token)
resp, err := wh.historyBlobDownloader.DownloadBlob(ctx, downloadReq)
if err != nil {
return nil, wh.error(err, scope)
}
return &gen.GetWorkflowExecutionHistoryResponse{
History: historyBlob.Body,
NextPageToken: nextToken,
History: resp.HistoryBlob.Body,
NextPageToken: resp.NextPageToken,
Archived: common.BoolPtr(true),
}, nil
}
Expand Down
89 changes: 0 additions & 89 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,6 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_InvalidPageToken()
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest([]byte{3, 4, 5, 1}), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.Nil(resp)
s.Error(err)
s.Equal(errInvalidNextArchivalPageToken, err)
}

func (s *workflowHandlerSuite) TestGetArchivedHistory_Failure_InvalidBlobKey() {
Expand Down Expand Up @@ -1072,94 +1071,6 @@ func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetFirstPage() {
s.NotNil(resp)
s.NotNil(resp.History)
s.True(resp.GetArchived())
expectedNextPageToken := &getHistoryContinuationTokenArchival{
BlobstorePageToken: 2,
CloseFailoverVersion: 10,
}
expectedSerializedNextPageToken, err := serializeHistoryTokenArchival(expectedNextPageToken)
s.NoError(err)
s.Equal(expectedSerializedNextPageToken, resp.NextPageToken)
}

func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_SecondPageIndexNotUsed() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse(testArchivalBucket, shared.ArchivalStatusEnabled), nil)
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean)
mBlobstore := &mocks.BlobstoreClient{}
unwrappedBlob := &archiver.HistoryBlob{
Header: &archiver.HistoryBlobHeader{
CurrentPageToken: common.IntPtr(common.FirstBlobPageToken + 1),
NextPageToken: common.IntPtr(common.FirstBlobPageToken + 2),
IsLast: common.BoolPtr(false),
},
Body: &shared.History{},
}
bytes, err := json.Marshal(unwrappedBlob)
s.NoError(err)
historyBlob, err := blob.Wrap(blob.NewBlob(bytes, map[string]string{}), blob.JSONEncoded())
s.NoError(err)
historyKey, _ := archiver.NewHistoryBlobKey(s.testDomainID, testWorkflowID, testRunID, 10, common.FirstBlobPageToken+1)
mBlobstore.On("Download", mock.Anything, mock.Anything, historyKey).Return(historyBlob, nil)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()
pageToken, err := serializeHistoryTokenArchival(&getHistoryContinuationTokenArchival{
BlobstorePageToken: common.FirstBlobPageToken + 1,
CloseFailoverVersion: 10,
})
s.NoError(err)
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(pageToken), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.NoError(err)
s.NotNil(resp)
s.NotNil(resp.History)
s.True(resp.GetArchived())
expectedNextPageToken := &getHistoryContinuationTokenArchival{
BlobstorePageToken: common.FirstBlobPageToken + 2,
CloseFailoverVersion: 10,
}
expectedSerializedNextPageToken, err := serializeHistoryTokenArchival(expectedNextPageToken)
s.NoError(err)
s.Equal(expectedSerializedNextPageToken, resp.NextPageToken)
}

func (s *workflowHandlerSuite) TestGetArchivedHistory_Success_GetLastPage() {
config := s.newConfig()
mMetadataManager := &mocks.MetadataManager{}
mMetadataManager.On("GetDomain", mock.Anything).Return(persistenceGetDomainResponse(testArchivalBucket, shared.ArchivalStatusEnabled), nil)
clusterMetadata := &mocks.ClusterMetadata{}
clusterMetadata.On("IsGlobalDomainEnabled").Return(false)
mService := cs.NewTestService(clusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.mockClientBean)
mBlobstore := &mocks.BlobstoreClient{}
unwrappedBlob := &archiver.HistoryBlob{
Header: &archiver.HistoryBlobHeader{
CurrentPageToken: common.IntPtr(5),
NextPageToken: common.IntPtr(common.LastBlobNextPageToken),
IsLast: common.BoolPtr(true),
},
Body: &shared.History{},
}
bytes, err := json.Marshal(unwrappedBlob)
s.NoError(err)
historyBlob, err := blob.Wrap(blob.NewBlob(bytes, map[string]string{}), blob.JSONEncoded())
s.NoError(err)
historyKey, _ := archiver.NewHistoryBlobKey(s.testDomainID, testWorkflowID, testRunID, 10, 5)
mBlobstore.On("Download", mock.Anything, mock.Anything, historyKey).Return(historyBlob, nil)
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager, mBlobstore)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.startWG.Done()
pageToken, err := serializeHistoryTokenArchival(&getHistoryContinuationTokenArchival{
BlobstorePageToken: 5,
CloseFailoverVersion: 10,
})
resp, err := wh.getArchivedHistory(context.Background(), getHistoryRequest(pageToken), s.testDomainID, metrics.NoopScope(metrics.Frontend))
s.NoError(err)
s.NotNil(resp)
s.NotNil(resp.History)
s.True(resp.GetArchived())
s.Nil(resp.NextPageToken)
}

func (s *workflowHandlerSuite) TestGetHistory() {
Expand Down
42 changes: 40 additions & 2 deletions service/worker/archiver/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
}
blobstoreClient := container.Blobstore

handledLastBlob := false
var handledLastBlob bool
var totalUploadSize int64

runBlobIntegrityCheck := shouldRun(container.Config.BlobIntegrityCheckProbability())
var uploadedHistoryEventHashes []uint64
for pageToken := common.FirstBlobPageToken; !handledLastBlob; pageToken++ {
key, err := NewHistoryBlobKey(request.DomainID, request.WorkflowID, request.RunID, request.CloseFailoverVersion, pageToken)
if err != nil {
Expand Down Expand Up @@ -156,6 +159,11 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.Error(err))
return err
}
if runBlobIntegrityCheck {
for _, e := range historyBlob.Body.Events {
uploadedHistoryEventHashes = append(uploadedHistoryEventHashes, hash(e.String()))
}
}

if historyMutated(historyBlob, &request) {
scope.IncCounter(metrics.ArchiverHistoryMutatedCount)
Expand Down Expand Up @@ -199,7 +207,6 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
}
continue
}

if err := uploadBlob(ctx, blobstoreClient, request.BucketName, key, blob); err != nil {
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.ArchivalBlobKey(key.String()), tag.Error(err))
return err
Expand All @@ -225,6 +232,37 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.ArchivalBlobKey(indexBlobKey.String()), tag.Error(err))
return err
}
if runBlobIntegrityCheck {
scope.IncCounter(metrics.ArchiverRunningBlobIntegrityCheckCount)
blobDownloader := container.HistoryBlobDownloader
if blobDownloader == nil {
blobDownloader = NewHistoryBlobDownloader(blobstoreClient)
}
req := &DownloadBlobRequest{
ArchivalBucket: request.BucketName,
DomainID: request.DomainID,
WorkflowID: request.WorkflowID,
RunID: request.RunID,
CloseFailoverVersion: common.Int64Ptr(request.CloseFailoverVersion),
}
var fetchedHistoryEventHashes []uint64
for len(fetchedHistoryEventHashes) == 0 || len(req.NextPageToken) != 0 {
resp, err := blobDownloader.DownloadBlob(ctx, req)
if err != nil {
scope.IncCounter(metrics.ArchiverCouldNotRunBlobIntegrityCheckCount)
logger.Error("failed to access history for blob integrity check", tag.Error(err))
return nil
}
for _, e := range resp.HistoryBlob.Body.Events {
fetchedHistoryEventHashes = append(fetchedHistoryEventHashes, hash(e.String()))
}
req.NextPageToken = resp.NextPageToken
}
if !hashesEqual(fetchedHistoryEventHashes, uploadedHistoryEventHashes) {
scope.IncCounter(metrics.ArchiverBlobIntegrityCheckFailedCount)
logger.Error("uploaded history does not match fetched history")
}
}
return nil
}

Expand Down
Loading

0 comments on commit 0999c92

Please sign in to comment.