Skip to content

Commit

Permalink
Add archival error details to logging (cadence-workflow#1960)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Jun 5, 2019
1 parent e79c070 commit 8221471
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 41 deletions.
31 changes: 11 additions & 20 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,6 @@ func ArchivalRequestCloseFailoverVersion(requestCloseFailoverVersion int64) Tag
return newInt64("archival-request-close-failover-version", requestCloseFailoverVersion)
}

// archival tags (blob tags)

// ArchivalBucket returns tag for Bucket
func ArchivalBucket(bucket string) Tag {
return newStringTag("archival-bucket", bucket)
Expand All @@ -689,29 +687,22 @@ func ArchivalBlobKey(blobKey string) Tag {
return newStringTag("archival-blob-key", blobKey)
}

// archival tags (file blobstore tags)

// ArchivalFileBlobstoreBlobPath returns tag for FileBlobstoreBlobPath
func ArchivalFileBlobstoreBlobPath(fileBlobstoreBlobPath string) Tag {
return newStringTag("archival-file-blobstore-blob-path", fileBlobstoreBlobPath)
}

// ArchivalFileBlobstoreMetadataPath returns tag for FileBlobstoreMetadataPath
func ArchivalFileBlobstoreMetadataPath(fileBlobstoreMetadataPath string) Tag {
return newStringTag("archival-file-blobstore-metadata-path", fileBlobstoreMetadataPath)
}

// ArchivalClusterArchivalStatus returns tag for ClusterArchivalStatus
func ArchivalClusterArchivalStatus(clusterArchivalStatus interface{}) Tag {
return newObjectTag("archival-cluster-archival-status", clusterArchivalStatus)
}

// ArchivalUploadSkipReason returns tag for UploadSkipReason
func ArchivalUploadSkipReason(uploadSkipReason string) Tag {
return newStringTag("archival-upload-skip-reason", uploadSkipReason)
// ArchivalUploadFailReason returns tag for ArchivalUploadFailReason
func ArchivalUploadFailReason(uploadFailReason string) Tag {
return newStringTag("archival-upload-fail-reason", uploadFailReason)
}

// UploadFailReason returns tag for UploadFailReason
func UploadFailReason(uploadFailReason string) Tag {
return newStringTag("archival-upload-fail-reason", uploadFailReason)
// ArchivalDeleteHistoryFailReason returns tag for ArchivalDeleteHistoryFailReason
func ArchivalDeleteHistoryFailReason(deleteHistoryFailReason string) Tag {
return newStringTag("archival-delete-history-fail-reason", deleteHistoryFailReason)
}

// ArchivalDeleteBlobsFailReason returns tag for ArchivalDeleteBlobsFailReason
func ArchivalDeleteBlobsFailReason(deleteBlobsFailReason string) Tag {
return newStringTag("archival-delete-blobs-fail-reason", deleteBlobsFailReason)
}
40 changes: 20 additions & 20 deletions service/worker/archiver/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,21 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
clusterMetadata := container.ClusterMetadata
domainCacheEntry, err := getDomainByID(ctx, domainCache, request.DomainID)
if err != nil {
logger.Error(uploadErrorMsg, tag.UploadFailReason("could not get domain cache entry"))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.Error(err))
return err
}
if clusterMetadata.ArchivalConfig().GetArchivalStatus() != cluster.ArchivalEnabled {
logger.Error(uploadSkipMsg, tag.UploadFailReason("cluster is not enabled for archival"))
logger.Error(uploadSkipMsg, tag.ArchivalUploadFailReason("cluster is not enabled for archival"))
scope.IncCounter(metrics.ArchiverSkipUploadCount)
return nil
}
if domainCacheEntry.GetConfig().ArchivalStatus != shared.ArchivalStatusEnabled {
logger.Error(uploadSkipMsg, tag.UploadFailReason("domain is not enabled for archival"))
logger.Error(uploadSkipMsg, tag.ArchivalUploadFailReason("domain is not enabled for archival"))
scope.IncCounter(metrics.ArchiverSkipUploadCount)
return nil
}
if err := validateArchivalRequest(&request); err != nil {
logger.Error(uploadErrorMsg, tag.UploadFailReason(err.Error()))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(err.Error()))
return err
}

Expand All @@ -127,13 +127,13 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
for pageToken := common.FirstBlobPageToken; !handledLastBlob; pageToken++ {
key, err := NewHistoryBlobKey(request.DomainID, request.WorkflowID, request.RunID, request.CloseFailoverVersion, pageToken)
if err != nil {
logger.Error(uploadErrorMsg, tag.UploadFailReason("could not construct blob key"))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason("could not construct blob key"))
return cadence.NewCustomError(errConstructKey, err.Error())
}

tags, err := getTags(ctx, blobstoreClient, request.BucketName, key)
if err != nil && err != blobstore.ErrBlobNotExists {
logger.Error(uploadErrorMsg, tag.UploadFailReason("could not get blob tags"), tag.ArchivalBlobKey(key.String()))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.ArchivalBlobKey(key.String()), tag.Error(err))
return err
}

Expand All @@ -152,13 +152,13 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err

historyBlob, err := getBlob(ctx, historyBlobReader, pageToken)
if err != nil {
logger.Error(uploadErrorMsg, tag.UploadFailReason("could not get history blob from reader"))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.Error(err))
return err
}

if historyMutated(historyBlob, &request) {
scope.IncCounter(metrics.ArchiverHistoryMutatedCount)
logger.Error(uploadErrorMsg, tag.UploadFailReason("history was mutated during archiving"))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason("history was mutated during archiving"))
return cadence.NewCustomError(errHistoryMutated)
}

Expand All @@ -170,13 +170,13 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err

blob, reason, err := constructBlob(historyBlob, container.Config.EnableArchivalCompression(domainName))
if err != nil {
logger.Error(uploadErrorMsg, tag.UploadFailReason(reason), tag.ArchivalBlobKey(key.String()))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(reason), tag.ArchivalBlobKey(key.String()))
return cadence.NewCustomError(errConstructBlob, err.Error())
}
if runConstTest {
existingBlob, err := downloadBlob(ctx, blobstoreClient, request.BucketName, key)
if err != nil {
logger.Error("failed to download blob for deterministic construction verification", tag.Error(err))
logger.Error("failed to download blob for deterministic construction verification", tag.ArchivalUploadFailReason(errorDetails(err)), tag.Error(err))
scope.IncCounter(metrics.ArchiverCouldNotRunDeterministicConstructionCheckCount)
} else if !blob.Equal(existingBlob) {
logger.Error("deterministic construction check failed")
Expand All @@ -186,27 +186,27 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
}

if err := uploadBlob(ctx, blobstoreClient, request.BucketName, key, blob); err != nil {
logger.Error(uploadErrorMsg, tag.UploadFailReason("could not upload blob"), tag.ArchivalBlobKey(key.String()))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.ArchivalBlobKey(key.String()), tag.Error(err))
return err
}
handledLastBlob = *historyBlob.Header.IsLast
}
indexBlobKey, err := NewHistoryIndexBlobKey(request.DomainID, request.WorkflowID, request.RunID)
if err != nil {
logger.Error(uploadErrorMsg, tag.UploadFailReason("could not construct index blob key"))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason("could not construct index blob key"))
return cadence.NewCustomError(errConstructKey, err.Error())
}
existingVersions, err := getTags(ctx, blobstoreClient, request.BucketName, indexBlobKey)
if err != nil && err != blobstore.ErrBlobNotExists {
logger.Error(uploadErrorMsg, tag.UploadFailReason("could not get index blob tags"), tag.ArchivalBlobKey(indexBlobKey.String()))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.ArchivalBlobKey(indexBlobKey.String()), tag.Error(err))
return err
}
indexBlobWithVersion := addVersion(request.CloseFailoverVersion, existingVersions)
if indexBlobWithVersion == nil {
return nil
}
if err := uploadBlob(ctx, blobstoreClient, request.BucketName, indexBlobKey, indexBlobWithVersion); err != nil {
logger.Error(uploadErrorMsg, tag.UploadFailReason("could not upload index blob"), tag.ArchivalBlobKey(indexBlobKey.String()))
logger.Error(uploadErrorMsg, tag.ArchivalUploadFailReason(errorDetails(err)), tag.ArchivalBlobKey(indexBlobKey.String()), tag.Error(err))
return err
}
return nil
Expand All @@ -232,13 +232,13 @@ func deleteHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
logger := tagLoggerWithRequest(container.Logger, request).WithTags(tag.Attempt(activity.GetInfo(ctx).Attempt))
if request.EventStoreVersion == persistence.EventStoreVersionV2 {
if err := deleteHistoryV2(ctx, container, request); err != nil {
logger.Error("failed to delete history from events v2", tag.Error(err))
logger.Error("failed to delete history from events v2", tag.ArchivalDeleteHistoryFailReason(errorDetails(err)), tag.Error(err))
return err
}
return nil
}
if err := deleteHistoryV1(ctx, container, request); err != nil {
logger.Error("failed to delete history from events v1", tag.Error(err))
logger.Error("failed to delete history from events v1", tag.ArchivalDeleteHistoryFailReason(errorDetails(err)), tag.Error(err))
return err
}
return nil
Expand Down Expand Up @@ -279,7 +279,7 @@ func deleteBlobActivity(ctx context.Context, request ArchiveRequest) (err error)
}
existingVersions, err := getTags(ctx, blobstoreClient, request.BucketName, indexBlobKey)
if err != nil && err != blobstore.ErrBlobNotExists {
logger.Error("could not get index blob tags", tag.ArchivalBlobKey(indexBlobKey.String()), tag.Error(err))
logger.Error("could not get index blob tags", tag.ArchivalBlobKey(indexBlobKey.String()), tag.ArchivalDeleteHistoryFailReason(errorDetails(err)), tag.Error(err))
return err
}
if err != blobstore.ErrBlobNotExists {
Expand All @@ -288,12 +288,12 @@ func deleteBlobActivity(ctx context.Context, request ArchiveRequest) (err error)
if len(indexBlobWithoutVersion.Tags) == 0 {
// We removed the last version in the tag, delete the whole index blob.
if _, err := deleteBlob(ctx, blobstoreClient, request.BucketName, indexBlobKey); err != nil {
logger.Error("failed to delete index blob", tag.ArchivalBlobKey(indexBlobKey.String()), tag.Error(err))
logger.Error("failed to delete index blob", tag.ArchivalBlobKey(indexBlobKey.String()), tag.ArchivalDeleteHistoryFailReason(errorDetails(err)), tag.Error(err))
return err
}
} else {
if err := uploadBlob(ctx, blobstoreClient, request.BucketName, indexBlobKey, indexBlobWithoutVersion); err != nil {
logger.Error("could not upload index blob", tag.ArchivalBlobKey(indexBlobKey.String()), tag.Error(err))
logger.Error("could not upload index blob", tag.ArchivalBlobKey(indexBlobKey.String()), tag.ArchivalDeleteHistoryFailReason(errorDetails(err)), tag.Error(err))
return err
}
}
Expand All @@ -318,7 +318,7 @@ func deleteBlobActivity(ctx context.Context, request ArchiveRequest) (err error)

deleted, err := deleteBlob(ctx, blobstoreClient, request.BucketName, key)
if err != nil {
logger.Error("failed to delete blob", tag.ArchivalBlobKey(key.String()), tag.Error(err))
logger.Error("failed to delete blob", tag.ArchivalBlobKey(key.String()), tag.ArchivalDeleteHistoryFailReason(errorDetails(err)), tag.Error(err))
return err
}
if !deleted && pageToken != startPageToken {
Expand Down
10 changes: 9 additions & 1 deletion service/worker/archiver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ func validateArchivalRequest(request *ArchiveRequest) error {
// this should not be able to occur, if domain enables archival bucket should always be set
return cadence.NewCustomError(errEmptyBucket)
}

return nil
}

func errorDetails(err error) string {
var details string
if _, ok := err.(*cadence.CustomError); !ok {
return details
}
err.(*cadence.CustomError).Details(&details)
return details
}

0 comments on commit 8221471

Please sign in to comment.