Skip to content

Commit

Permalink
Size reduction of execution scanner workflow (cadence-workflow#3313)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Jun 5, 2020
1 parent 9ffa5e4 commit 754870f
Show file tree
Hide file tree
Showing 12 changed files with 1,334 additions and 353 deletions.
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ var keys = map[Key]string{
HistoryScannerEnabled: "worker.historyScannerEnabled",
ExecutionsScannerEnabled: "worker.executionsScannerEnabled",
ExecutionsScannerBlobstoreFlushThreshold: "worker.executionsScannerBlobstoreFlushThreshold",
ExecutionsScannerActivityBatchSize: "worker.executionsScannerActivityBatchSize",
ExecutionsScannerConcurrency: "worker.executionsScannerConcurrency",
ExecutionsScannerPersistencePageSize: "worker.executionsScannerPersistencePageSize",
ExecutionsScannerInvariantCollectionHistory: "worker.executionsScannerInvariantCollectionHistory",
Expand Down Expand Up @@ -754,6 +755,8 @@ const (
ExecutionsScannerConcurrency
// ExecutionsScannerBlobstoreFlushThreshold indicates the flush threshold of blobstore in execution scanner
ExecutionsScannerBlobstoreFlushThreshold
// ExecutionsScannerActivityBatchSize indicates the batch size of scanner activities
ExecutionsScannerActivityBatchSize
// ExecutionsScannerPersistencePageSize indicates the page size of execution persistence fetches in execution scanner
ExecutionsScannerPersistencePageSize
// ExecutionsScannerInvariantCollectionMutableState indicates if mutable state invariant checks should be run
Expand Down
179 changes: 136 additions & 43 deletions service/worker/scanner/executions/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ package executions
import (
"context"
"encoding/json"
"errors"

"go.uber.org/cadence"

"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/activity"
Expand All @@ -47,6 +48,11 @@ const (
FixerCorruptedKeysActivityName = "cadence-sys-executions-fixer-corrupted-keys-activity"
// FixerFixShardActivityName is the activity name for FixShardActivity
FixerFixShardActivityName = "cadence-sys-executions-fixer-fix-shard-activity"

// ErrScanWorkflowNotClosed indicates fix was attempted on scan workflow which was not finished
ErrScanWorkflowNotClosed = "scan workflow is not closed, only can run fix on output of finished scan workflow"
// ErrSerialization indicates a serialization or deserialization error occurred
ErrSerialization = "encountered serialization error"
)

type (
Expand All @@ -57,40 +63,57 @@ type (

// ScanShardActivityParams is the parameter for ScanShardActivity
ScanShardActivityParams struct {
ShardID int
Shards []int
ExecutionsPageSize int
BlobstoreFlushThreshold int
InvariantCollections InvariantCollections
}

// ScannerEmitMetricsActivityParams is the parameter for ScannerEmitMetricsActivity
ScannerEmitMetricsActivityParams struct {
ShardStatusResult ShardStatusResult
AggregateReportResult AggregateScanReportResult
ShardSuccessCount int
ShardControlFlowFailureCount int
AggregateReportResult AggregateScanReportResult
}

// FixerCorruptedKeysActivityParams is the parameter for FixerCorruptedKeysActivity
FixerCorruptedKeysActivityParams struct {
ScannerWorkflowWorkflowID string
ScannerWorkflowRunID string
StartingShardID *int
}

// FixShardActivityParams is the parameter for FixShardActivity
FixShardActivityParams struct {
CorruptedKeysEntry CorruptedKeysEntry
CorruptedKeysEntries []CorruptedKeysEntry
ResolvedFixerWorkflowConfig ResolvedFixerWorkflowConfig
}

// FixerCorruptedKeysActivityResult is the result of FixerCorruptedKeysActivity
FixerCorruptedKeysActivityResult struct {
CorruptedKeys []CorruptedKeysEntry
Shards []int
CorruptedKeys []CorruptedKeysEntry
MinShard *int
MaxShard *int
ShardQueryPaginationToken ShardQueryPaginationToken
}

// CorruptedKeysEntry is a pair of shardID and corrupted keys
CorruptedKeysEntry struct {
ShardID int
CorruptedKeys common.Keys
}

// ScanShardHeartbeatDetails is the heartbeat details for scan shard
ScanShardHeartbeatDetails struct {
LastShardIndexHandled int
Reports []common.ShardScanReport
}

// FixShardHeartbeatDetails is the heartbeat details for the fix shard
FixShardHeartbeatDetails struct {
LastShardIndexHandled int
Reports []common.ShardFixReport
}
)

// ScannerEmitMetricsActivity will emit metrics for a complete run of scanner
Expand All @@ -99,18 +122,8 @@ func ScannerEmitMetricsActivity(
params ScannerEmitMetricsActivityParams,
) error {
scope := activityCtx.Value(ScannerContextKey).(ScannerContext).Scope.Tagged(metrics.ActivityTypeTag(ScannerEmitMetricsActivityName))
shardSuccess := 0
shardControlFlowFailure := 0
for _, v := range params.ShardStatusResult {
switch v {
case ShardStatusSuccess:
shardSuccess++
case ShardStatusControlFlowFailure:
shardControlFlowFailure++
}
}
scope.UpdateGauge(metrics.CadenceShardSuccessGauge, float64(shardSuccess))
scope.UpdateGauge(metrics.CadenceShardFailureGauge, float64(shardControlFlowFailure))
scope.UpdateGauge(metrics.CadenceShardSuccessGauge, float64(params.ShardSuccessCount))
scope.UpdateGauge(metrics.CadenceShardFailureGauge, float64(params.ShardControlFlowFailureCount))

agg := params.AggregateReportResult
scope.UpdateGauge(metrics.ScannerExecutionsGauge, float64(agg.ExecutionsCount))
Expand All @@ -123,17 +136,46 @@ func ScannerEmitMetricsActivity(
return nil
}

// ScanShardActivity will scan all executions in a shard and check for invariant violations.
// ScanShardActivity will scan a collection of shards for invariant violations.
func ScanShardActivity(
activityCtx context.Context,
params ScanShardActivityParams,
) ([]common.ShardScanReport, error) {
heartbeatDetails := ScanShardHeartbeatDetails{
LastShardIndexHandled: -1,
Reports: nil,
}
if activity.HasHeartbeatDetails(activityCtx) {
if err := activity.GetHeartbeatDetails(activityCtx, &heartbeatDetails); err != nil {
return nil, err
}
}
for i := heartbeatDetails.LastShardIndexHandled + 1; i < len(params.Shards); i++ {
currentShardID := params.Shards[i]
shardReport, err := scanShard(activityCtx, params, currentShardID, heartbeatDetails)
if err != nil {
return nil, err
}
heartbeatDetails = ScanShardHeartbeatDetails{
LastShardIndexHandled: i,
Reports: append(heartbeatDetails.Reports, *shardReport),
}
}
return heartbeatDetails.Reports, nil
}

func scanShard(
activityCtx context.Context,
params ScanShardActivityParams,
shardID int,
heartbeatDetails ScanShardHeartbeatDetails,
) (*common.ShardScanReport, error) {
ctx := activityCtx.Value(ScannerContextKey).(ScannerContext)
resources := ctx.Resource
scope := ctx.Scope.Tagged(metrics.ActivityTypeTag(ScannerScanShardActivityName))
sw := scope.StartTimer(metrics.CadenceLatency)
defer sw.Stop()
execManager, err := resources.GetExecutionManager(params.ShardID)
execManager, err := resources.GetExecutionManager(shardID)
if err != nil {
scope.IncCounter(metrics.CadenceFailures)
return nil, err
Expand All @@ -147,13 +189,13 @@ func ScanShardActivity(
}
pr := common.NewPersistenceRetryer(execManager, resources.GetHistoryManager())
scanner := shard.NewScanner(
params.ShardID,
shardID,
pr,
params.ExecutionsPageSize,
resources.GetBlobstoreClient(),
params.BlobstoreFlushThreshold,
collections,
func() { activity.RecordHeartbeat(activityCtx) })
func() { activity.RecordHeartbeat(activityCtx, heartbeatDetails) })
report := scanner.Scan()
if report.Result.ControlFlowFailure != nil {
scope.IncCounter(metrics.CadenceFailures)
Expand All @@ -172,6 +214,7 @@ func ScannerConfigActivity(
Concurrency: dc.Concurrency(),
ExecutionsPageSize: dc.ExecutionsPageSize(),
BlobstoreFlushThreshold: dc.BlobstoreFlushThreshold(),
ActivityBatchSize: dc.ActivityBatchSize(),
InvariantCollections: InvariantCollections{
InvariantCollectionMutableState: dc.DynamicConfigInvariantCollections.InvariantCollectionMutableState(),
InvariantCollectionHistory: dc.DynamicConfigInvariantCollections.InvariantCollectionHistory(),
Expand All @@ -193,12 +236,16 @@ func ScannerConfigActivity(
if overwrites.InvariantCollections != nil {
result.InvariantCollections = *overwrites.InvariantCollections
}
if overwrites.ActivityBatchSize != nil {
result.ActivityBatchSize = *overwrites.ActivityBatchSize
}
return result, nil
}

// FixerCorruptedKeysActivity will check that provided scanner workflow is closed
// get corrupt keys from it, and flatten these keys into a list. If provided scanner
// workflow is not closed or query fails then error will be returned.
// FixerCorruptedKeysActivity will fetch the keys of blobs from shards with corruptions from a completed scan workflow.
// If scan workflow is not closed or if query fails activity will return an error.
// Accepts as input the shard to start query at and returns a next page token, therefore this activity can
// be used to do pagination.
func FixerCorruptedKeysActivity(
activityCtx context.Context,
params FixerCorruptedKeysActivityParams,
Expand All @@ -216,7 +263,14 @@ func FixerCorruptedKeysActivity(
return nil, err
}
if descResp.WorkflowExecutionInfo.CloseStatus == nil {
return nil, errors.New("provided scan workflow is not closed, can only use finished scan")
return nil, cadence.NewCustomError(ErrScanWorkflowNotClosed)
}
queryArgs := PaginatedShardQueryRequest{
StartingShardID: params.StartingShardID,
}
queryArgsBytes, err := json.Marshal(queryArgs)
if err != nil {
return nil, cadence.NewCustomError(ErrSerialization)
}
queryResp, err := client.QueryWorkflow(activityCtx, &shared.QueryWorkflowRequest{
Domain: c.StringPtr(c.SystemLocalDomainName),
Expand All @@ -226,42 +280,81 @@ func FixerCorruptedKeysActivity(
},
Query: &shared.WorkflowQuery{
QueryType: c.StringPtr(ShardCorruptKeysQuery),
QueryArgs: queryArgsBytes,
},
})
if err != nil {
return nil, err
}
var corruptedKeys ShardCorruptKeysResult
if err := json.Unmarshal(queryResp.QueryResult, &corruptedKeys); err != nil {
return nil, err
queryResult := &ShardCorruptKeysQueryResult{}
if err := json.Unmarshal(queryResp.QueryResult, &queryResult); err != nil {
return nil, cadence.NewCustomError(ErrSerialization)
}
var corrupted []CorruptedKeysEntry
var shards []int
for k, v := range corruptedKeys {
var minShardID *int
var maxShardID *int
for sid, keys := range queryResult.Result {
if minShardID == nil || *minShardID > sid {
minShardID = c.IntPtr(sid)
}
if maxShardID == nil || *maxShardID < sid {
maxShardID = c.IntPtr(sid)
}
corrupted = append(corrupted, CorruptedKeysEntry{
ShardID: k,
CorruptedKeys: v,
ShardID: sid,
CorruptedKeys: keys,
})
shards = append(shards, k)
}
return &FixerCorruptedKeysActivityResult{
CorruptedKeys: corrupted,
Shards: shards,
CorruptedKeys: corrupted,
MinShard: minShardID,
MaxShard: maxShardID,
ShardQueryPaginationToken: queryResult.ShardQueryPaginationToken,
}, nil
}

// FixShardActivity will fetch blobs of corrupted executions from scan workflow.
// It will then iterate over all corrupted executions and run fix on them.
// FixShardActivity will fix a collection of shards.
func FixShardActivity(
activityCtx context.Context,
params FixShardActivityParams,
) ([]common.ShardFixReport, error) {
heartbeatDetails := FixShardHeartbeatDetails{
LastShardIndexHandled: -1,
Reports: nil,
}
if activity.HasHeartbeatDetails(activityCtx) {
if err := activity.GetHeartbeatDetails(activityCtx, &heartbeatDetails); err != nil {
return nil, err
}
}
for i := heartbeatDetails.LastShardIndexHandled + 1; i < len(params.CorruptedKeysEntries); i++ {
currentShardID := params.CorruptedKeysEntries[i].ShardID
currentKeys := params.CorruptedKeysEntries[i].CorruptedKeys
shardReport, err := fixShard(activityCtx, params, currentShardID, currentKeys, heartbeatDetails)
if err != nil {
return nil, err
}
heartbeatDetails = FixShardHeartbeatDetails{
LastShardIndexHandled: i,
Reports: append(heartbeatDetails.Reports, *shardReport),
}
}
return heartbeatDetails.Reports, nil
}

func fixShard(
activityCtx context.Context,
params FixShardActivityParams,
shardID int,
corruptedKeys common.Keys,
heartbeatDetails FixShardHeartbeatDetails,
) (*common.ShardFixReport, error) {
ctx := activityCtx.Value(FixerContextKey).(FixerContext)
resources := ctx.Resource
scope := ctx.Scope.Tagged(metrics.ActivityTypeTag(FixerFixShardActivityName))
sw := scope.StartTimer(metrics.CadenceLatency)
defer sw.Stop()
execManager, err := resources.GetExecutionManager(params.CorruptedKeysEntry.ShardID)
execManager, err := resources.GetExecutionManager(shardID)
if err != nil {
scope.IncCounter(metrics.CadenceFailures)
return nil, err
Expand All @@ -275,13 +368,13 @@ func FixShardActivity(
}
pr := common.NewPersistenceRetryer(execManager, resources.GetHistoryManager())
fixer := shard.NewFixer(
params.CorruptedKeysEntry.ShardID,
shardID,
pr,
resources.GetBlobstoreClient(),
params.CorruptedKeysEntry.CorruptedKeys,
corruptedKeys,
params.ResolvedFixerWorkflowConfig.BlobstoreFlushThreshold,
collections,
func() { activity.RecordHeartbeat(activityCtx) })
func() { activity.RecordHeartbeat(activityCtx, heartbeatDetails) })
report := fixer.Fix()
if report.Result.ControlFlowFailure != nil {
scope.IncCounter(metrics.CadenceFailures)
Expand Down
Loading

0 comments on commit 754870f

Please sign in to comment.