Skip to content

Commit

Permalink
Start enabled shardscanner fixers (cadence-workflow#3906)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Jan 21, 2021
1 parent 8b49962 commit 2f6bede
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 16 deletions.
9 changes: 8 additions & 1 deletion service/worker/scanner/executions/concrete_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (

// ConcreteExecutionsFixerWFTypeName defines workflow type name for concrete executions fixer
ConcreteExecutionsFixerWFTypeName = "cadence-sys-executions-fixer-workflow"
concreteExecutionsFixerWFID = "cadence-sys-executions-fixer"
concreteExecutionsFixerTaskListName = "cadence-sys-executions-fixer-tasklist-0"
)

Expand Down Expand Up @@ -173,13 +174,19 @@ func ConcreteExecutionScannerConfig(dc *dynamicconfig.Collection) *shardscanner.
DynamicCollection: dc,
ScannerHooks: ConcreteExecutionHooks,
FixerHooks: ConcreteExecutionFixerHooks,
FixerTLName: concreteExecutionsFixerTaskListName,
StartWorkflowOptions: cclient.StartWorkflowOptions{
ID: concreteExecutionsScannerWFID,
TaskList: concreteExecutionsScannerTaskListName,
ExecutionStartToCloseTimeout: 20 * 365 * 24 * time.Hour,
WorkflowIDReusePolicy: cclient.WorkflowIDReusePolicyAllowDuplicate,
CronSchedule: "* * * * *",
},
StartFixerOptions: cclient.StartWorkflowOptions{
ID: concreteExecutionsFixerWFID,
TaskList: concreteExecutionsFixerTaskListName,
ExecutionStartToCloseTimeout: 20 * 365 * 24 * time.Hour,
WorkflowIDReusePolicy: cclient.WorkflowIDReusePolicyAllowDuplicate,
CronSchedule: "* * * * *",
},
}
}
15 changes: 11 additions & 4 deletions service/worker/scanner/executions/current_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,16 @@ import (
)

const (
// CurrentExecutionsScannerWFID is the current execution scanner workflow ID
CurrentExecutionsScannerWFID = "cadence-sys-current-executions-scanner"
// currentExecutionsScannerWFID is the current execution scanner workflow ID
currentExecutionsScannerWFID = "cadence-sys-current-executions-scanner"
// CurrentExecutionsScannerWFTypeName is the current execution scanner workflow type
CurrentExecutionsScannerWFTypeName = "cadence-sys-current-executions-scanner-workflow"
// CurrentExecutionsScannerTaskListName is the current execution scanner workflow tasklist
CurrentExecutionsScannerTaskListName = "cadence-sys-current-executions-scanner-tasklist-0"

// CurrentExecutionsFixerWFTypeName is the current execution fixer workflow ID
CurrentExecutionsFixerWFTypeName = "cadence-sys-current-executions-fixer-workflow"
currentExecutionsFixerWFID = "cadence-sys-current-executions-fixer"
// CurrentExecutionsFixerTaskListName is the current execution fixer workflow tasklist
CurrentExecutionsFixerTaskListName = "cadence-sys-current-executions-fixer-tasklist-0"
)
Expand Down Expand Up @@ -144,14 +145,20 @@ func CurrentExecutionScannerConfig(dc *dynamicconfig.Collection) *shardscanner.S
},
ScannerHooks: CurrentExecutionsHooks,
FixerHooks: CurrentExecutionFixerHooks,
FixerTLName: CurrentExecutionsFixerTaskListName,
StartWorkflowOptions: cclient.StartWorkflowOptions{
ID: CurrentExecutionsScannerWFID,
ID: currentExecutionsScannerWFID,
TaskList: CurrentExecutionsScannerTaskListName,
ExecutionStartToCloseTimeout: 20 * 365 * 24 * time.Hour,
WorkflowIDReusePolicy: cclient.WorkflowIDReusePolicyAllowDuplicate,
CronSchedule: "* * * * *",
},
StartFixerOptions: cclient.StartWorkflowOptions{
ID: currentExecutionsFixerWFID,
TaskList: CurrentExecutionsFixerTaskListName,
ExecutionStartToCloseTimeout: 20 * 365 * 24 * time.Hour,
WorkflowIDReusePolicy: cclient.WorkflowIDReusePolicyAllowDuplicate,
CronSchedule: "* * * * *",
},
}
}

Expand Down
27 changes: 19 additions & 8 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,20 +181,31 @@ func (s *Scanner) startShardScanner(
getShardFixerContext(s.context, config),
)

workerTaskListNames = append(workerTaskListNames, config.FixerTLName)
go s.startWorkflowWithRetry(
config.StartFixerOptions,
config.FixerWFTypeName,
shardscanner.FixerWorkflowParams{
ScannerWorkflowWorkflowID: config.StartWorkflowOptions.ID,
},
)

workerTaskListNames = append(workerTaskListNames, config.StartFixerOptions.TaskList)
}

if config.DynamicParams.ScannerEnabled() {
workerTaskListNames = append(workerTaskListNames, config.StartWorkflowOptions.TaskList)

go s.startWorkflowWithRetry(config.StartWorkflowOptions, config.ScannerWFTypeName, shardscanner.ScannerWorkflowParams{
Shards: shardscanner.Shards{
Range: &shardscanner.ShardRange{
Min: 0,
Max: s.context.cfg.Persistence.NumHistoryShards,
go s.startWorkflowWithRetry(
config.StartWorkflowOptions,
config.ScannerWFTypeName,
shardscanner.ScannerWorkflowParams{
Shards: shardscanner.Shards{
Range: &shardscanner.ShardRange{
Min: 0,
Max: s.context.cfg.Persistence.NumHistoryShards,
},
},
},
})
})
}

return backgroundActivityContext, workerTaskListNames
Expand Down
26 changes: 26 additions & 0 deletions service/worker/scanner/shardscanner/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package shardscanner
import (
"context"
"encoding/json"
"errors"
"time"

"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/shared"
Expand Down Expand Up @@ -179,6 +181,30 @@ func FixerCorruptedKeysActivity(
) (*FixerCorruptedKeysActivityResult, error) {
resource := activityCtx.Value(params.ContextKey).(FixerContext).Resource
client := resource.GetSDKClient()
if params.ScannerWorkflowRunID == "" {
listResp, err := client.ListClosedWorkflowExecutions(activityCtx, &shared.ListClosedWorkflowExecutionsRequest{
Domain: c.StringPtr(c.SystemLocalDomainName),
MaximumPageSize: c.Int32Ptr(1),
NextPageToken: nil,
StartTimeFilter: &shared.StartTimeFilter{
EarliestTime: c.Int64Ptr(0),
LatestTime: c.Int64Ptr(time.Now().UnixNano()),
},
ExecutionFilter: &shared.WorkflowExecutionFilter{
WorkflowId: c.StringPtr(params.ScannerWorkflowWorkflowID),
},
StatusFilter: shared.WorkflowExecutionCloseStatusCompleted.Ptr(),
})
if err != nil {
return nil, err
}
if len(listResp.Executions) != 1 {
return nil, errors.New("got unexpected number of executions back from list")
}

params.ScannerWorkflowRunID = *listResp.Executions[0].Execution.RunId
}

descResp, err := client.DescribeWorkflowExecution(activityCtx, &shared.DescribeWorkflowExecutionRequest{
Domain: c.StringPtr(c.SystemLocalDomainName),
Execution: &shared.WorkflowExecution{
Expand Down
26 changes: 25 additions & 1 deletion service/worker/scanner/shardscanner/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"context"
"encoding/json"
"testing"
"time"

"github.com/pborman/uuid"
"github.com/uber-go/tally"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -75,7 +77,10 @@ func (s *activitiesSuite) SetupSuite() {
func (s *activitiesSuite) SetupTest() {
s.controller = gomock.NewController(s.T())
s.mockResource = resource.NewTest(s.controller, metrics.Worker)
defer s.controller.Finish()
}

func (s *activitiesSuite) TearDownTest() {
s.controller.Finish()
}

func (s *activitiesSuite) TestScanShardActivity() {
Expand Down Expand Up @@ -404,7 +409,26 @@ func (s *activitiesSuite) TestFixerCorruptedKeysActivity() {
},
}
queryResultData, err := json.Marshal(queryResult)
response := &shared.ListClosedWorkflowExecutionsResponse{
Executions: []*shared.WorkflowExecutionInfo{
{
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr("test-list-workflow-id"),
RunId: common.StringPtr(uuid.New()),
},
Type: &shared.WorkflowType{
Name: common.StringPtr("test-list-workflow-type"),
},
StartTime: common.Int64Ptr(time.Now().UnixNano()),
CloseTime: common.Int64Ptr(time.Now().Add(time.Hour).UnixNano()),
CloseStatus: shared.WorkflowExecutionCloseStatusCompleted.Ptr(),
HistoryLength: common.Int64Ptr(12),
},
},
}
s.NoError(err)
s.mockResource.SDKClient.EXPECT().ListClosedWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(response, nil)

s.mockResource.SDKClient.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any()).Return(&shared.QueryWorkflowResponse{
QueryResult: queryResultData,
}, nil)
Expand Down
2 changes: 1 addition & 1 deletion service/worker/scanner/shardscanner/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,14 @@ type (

// ScannerConfig is the config for ShardScanner workflow
ScannerConfig struct {
FixerTLName string
ScannerWFTypeName string
FixerWFTypeName string
ScannerHooks func() *ScannerHooks
FixerHooks func() *FixerHooks
DynamicParams DynamicParams
DynamicCollection *dynamicconfig.Collection
StartWorkflowOptions cclient.StartWorkflowOptions
StartFixerOptions cclient.StartWorkflowOptions
}

// FixerWorkflowConfigOverwrites enables overwriting the default values.
Expand Down
10 changes: 9 additions & 1 deletion service/worker/scanner/timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
// FixerWFTypeName defines workflow type name for timers fixer
FixerWFTypeName = "cadence-sys-timers-fixer-workflow"
fixerTaskListName = "cadence-sys-timers-fixer-tasklist-0"
fixerwfid = "cadence-sys-timers-fixer"
periodStartKey = "period_start"
periodEndKey = "period_end"

Expand Down Expand Up @@ -183,14 +184,21 @@ func ScannerConfig(dc *dynamicconfig.Collection) *shardscanner.ScannerConfig {
DynamicCollection: dc,
ScannerHooks: ScannerHooks,
FixerHooks: FixerHooks,
FixerTLName: fixerTaskListName,

StartWorkflowOptions: client.StartWorkflowOptions{
ID: wfid,
TaskList: scannerTaskListName,
ExecutionStartToCloseTimeout: 20 * 365 * 24 * time.Hour,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate,
CronSchedule: "* * * * *",
},
StartFixerOptions: client.StartWorkflowOptions{
ID: fixerwfid,
TaskList: fixerTaskListName,
ExecutionStartToCloseTimeout: 20 * 365 * 24 * time.Hour,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate,
CronSchedule: "* * * * *",
},
}
}

Expand Down

0 comments on commit 2f6bede

Please sign in to comment.