Skip to content

Commit

Permalink
Support managed failover workflow with drill mode (cadence-workflow#4177
Browse files Browse the repository at this point in the history
)

Support managed failover workflow with drill mode
  • Loading branch information
yux0 authored May 7, 2021
1 parent fc63ab1 commit 95713db
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 221 deletions.
5 changes: 5 additions & 0 deletions service/worker/failovermanager/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
Expand Down Expand Up @@ -95,6 +97,9 @@ func (s *FailoverManager) Start() error {
Tracer: opentracing.GlobalTracer(),
}
failoverWorker := worker.New(s.svcClient, common.SystemLocalDomainName, TaskListName, workerOpts)
failoverWorker.RegisterWorkflowWithOptions(FailoverWorkflow, workflow.RegisterOptions{Name: WorkflowTypeName})
failoverWorker.RegisterActivityWithOptions(FailoverActivity, activity.RegisterOptions{Name: failoverActivityName})
failoverWorker.RegisterActivityWithOptions(GetDomainsActivity, activity.RegisterOptions{Name: getDomainsActivityName})
s.worker = failoverWorker
return failoverWorker.Start()
}
Expand Down
137 changes: 93 additions & 44 deletions service/worker/failovermanager/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
WorkflowTypeName = "cadence-sys-failoverManager-workflow"
// WorkflowID will be reused to ensure only one workflow running
WorkflowID = "cadence-failover-manager"
DrillWorkflowID = WorkflowID + "-drill"
failoverActivityName = "cadence-sys-failover-activity"
getDomainsActivityName = "cadence-sys-getDomains-activity"

Expand All @@ -68,6 +69,8 @@ const (

// workflow states for query

// WorkflowInitialized state
WorkflowInitialized = "initialized"
// WorkflowRunning state
WorkflowRunning = "running"
// WorkflowPaused state
Expand All @@ -93,12 +96,16 @@ type (
BatchFailoverWaitTimeInSeconds int
// Domains candidates to be failover
Domains []string
// DrillWaitTime defines the wait time of a failover drill
DrillWaitTime time.Duration
}

// FailoverResult is workflow result
FailoverResult struct {
SuccessDomains []string
FailedDomains []string
SuccessDomains []string
FailedDomains []string
SuccessResetDomains []string
FailedResetDomains []string
}

// GetDomainsActivityParams params for activity
Expand All @@ -122,24 +129,20 @@ type (

// QueryResult for failover progress
QueryResult struct {
TotalDomains int
Success int
Failed int
State string
TargetCluster string
SourceCluster string
SuccessDomains []string // SuccessDomains are guaranteed succeed processed
FailedDomains []string // FailedDomains contains false positive
Operator string
TotalDomains int
Success int
Failed int
State string
TargetCluster string
SourceCluster string
SuccessDomains []string // SuccessDomains are guaranteed succeed processed
FailedDomains []string // FailedDomains contains false positive
SuccessResetDomains []string // SuccessResetDomains are domains successfully reset in drill mode
FailedResetDomains []string // FailedResetDomains contains false positive in drill mode
Operator string
}
)

func init() {
workflow.RegisterWithOptions(FailoverWorkflow, workflow.RegisterOptions{Name: WorkflowTypeName})
activity.RegisterWithOptions(FailoverActivity, activity.RegisterOptions{Name: failoverActivityName})
activity.RegisterWithOptions(GetDomainsActivity, activity.RegisterOptions{Name: getDomainsActivityName})
}

// FailoverWorkflow is the workflow that managed failover all domains with IsManagedByCadence=true
func FailoverWorkflow(ctx workflow.Context, params *FailoverParams) (*FailoverResult, error) {
err := validateParams(params)
Expand All @@ -150,20 +153,24 @@ func FailoverWorkflow(ctx workflow.Context, params *FailoverParams) (*FailoverRe
// define query properties
var failedDomains []string
var successDomains []string
var successResetDomains []string
var failedResetDomains []string
var totalNumOfDomains int
wfState := WorkflowRunning
wfState := WorkflowInitialized
operator := getOperator(ctx)
err = workflow.SetQueryHandler(ctx, QueryType, func(input []byte) (*QueryResult, error) {
return &QueryResult{
TotalDomains: totalNumOfDomains,
Success: len(successDomains),
Failed: len(failedDomains),
State: wfState,
TargetCluster: params.TargetCluster,
SourceCluster: params.SourceCluster,
SuccessDomains: successDomains,
FailedDomains: failedDomains,
Operator: operator,
TotalDomains: totalNumOfDomains,
Success: len(successDomains),
Failed: len(failedDomains),
State: wfState,
TargetCluster: params.TargetCluster,
SourceCluster: params.SourceCluster,
SuccessDomains: successDomains,
FailedDomains: failedDomains,
SuccessResetDomains: successResetDomains,
FailedResetDomains: failedResetDomains,
Operator: operator,
}, nil
})
if err != nil {
Expand All @@ -184,30 +191,70 @@ func FailoverWorkflow(ctx workflow.Context, params *FailoverParams) (*FailoverRe
}
totalNumOfDomains = len(domains)

// failover in batch
ao = workflow.WithActivityOptions(ctx, getFailoverActivityOptions())
batchSize := params.BatchFailoverSize
times := len(domains)/batchSize + 1

pauseCh := workflow.GetSignalChannel(ctx, PauseSignal)
resumeCh := workflow.GetSignalChannel(ctx, ResumeSignal)
var shouldPause bool

for i := 0; i < times; i++ {
// check if need to pause
checkPauseSignal := func() {
shouldPause = pauseCh.ReceiveAsync(nil)
if shouldPause {
wfState = WorkflowPaused
resumeCh.Receive(ctx, nil)
// clean up all pending pause signal
cleanupChannel(pauseCh)
}
wfState = WorkflowRunning
}

// failover in batch
successDomains, failedDomains = failoverDomainsByBatch(ctx, domains, params, checkPauseSignal, false)

if params.DrillWaitTime == 0 {
// This is a normal failover
wfState = WorkflowCompleted
return &FailoverResult{
SuccessDomains: successDomains,
FailedDomains: failedDomains,
}, nil
}

workflow.Sleep(ctx, params.DrillWaitTime)
// Reset domains to original cluster
successResetDomains, failedResetDomains = failoverDomainsByBatch(ctx, domains, params, checkPauseSignal, true)
wfState = WorkflowCompleted

return &FailoverResult{
SuccessDomains: successDomains,
FailedDomains: failedDomains,
SuccessResetDomains: successResetDomains,
FailedResetDomains: failedResetDomains,
}, nil
}

func failoverDomainsByBatch(
ctx workflow.Context,
domains []string,
params *FailoverParams,
pauseSignalHandler func(),
reverseFailover bool,
) (successDomains []string, failedDomains []string) {

totalNumOfDomains := len(domains)
batchSize := params.BatchFailoverSize
times := totalNumOfDomains/batchSize + 1
ao := workflow.WithActivityOptions(ctx, getFailoverActivityOptions())
targetCluster := params.TargetCluster
if reverseFailover {
targetCluster = params.SourceCluster
}
for i := 0; i < times; i++ {
pauseSignalHandler()

failoverActivityParams := &FailoverActivityParams{
Domains: domains[i*batchSize : common.MinInt((i+1)*batchSize, totalNumOfDomains)],
TargetCluster: params.TargetCluster,
TargetCluster: targetCluster,
}
var actResult FailoverActivityResult
err = workflow.ExecuteActivity(ao, FailoverActivity, failoverActivityParams).Get(ctx, &actResult)
err := workflow.ExecuteActivity(ao, FailoverActivity, failoverActivityParams).Get(ctx, &actResult)
if err != nil {
// Domains in failed activity can be either failovered or not, but we treated them as failed.
// This makes the query result for FailedDomains contains false positive results.
Expand All @@ -216,15 +263,9 @@ func FailoverWorkflow(ctx workflow.Context, params *FailoverParams) (*FailoverRe
successDomains = append(successDomains, actResult.SuccessDomains...)
failedDomains = append(failedDomains, actResult.FailedDomains...)
}

workflow.Sleep(ctx, time.Duration(params.BatchFailoverWaitTimeInSeconds)*time.Second)
}

wfState = WorkflowCompleted
return &FailoverResult{
SuccessDomains: successDomains,
FailedDomains: failedDomains,
}, nil
return
}

func getOperator(ctx workflow.Context) string {
Expand Down Expand Up @@ -405,3 +446,11 @@ func FailoverActivity(ctx context.Context, params *FailoverActivityParams) (*Fai
FailedDomains: failedDomains,
}, nil
}

func cleanupChannel(channel workflow.Channel) {
for {
if hasValue := channel.ReceiveAsync(nil); !hasValue {
return
}
}
}
Loading

0 comments on commit 95713db

Please sign in to comment.