Skip to content

Commit

Permalink
Reject query based on workflow state (cadence-workflow#2413)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Aug 20, 2019
1 parent e51f6c5 commit cc54edd
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 7 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ require (
github.com/valyala/fastjson v1.4.1
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
go.uber.org/atomic v1.4.0
go.uber.org/cadence v0.0.0-20190702231331-27b0ba2bc456
go.uber.org/cadence v0.9.1-0.20190819173509-c3cd9f8f9745
go.uber.org/dig v1.7.0 // indirect
go.uber.org/fx v1.9.0 // indirect
go.uber.org/goleak v0.10.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/cadence v0.0.0-20190702231331-27b0ba2bc456 h1:4nI6vqmnHOgQyeDDepZghHE4YYQuxXFnJvPZA8JKGjs=
go.uber.org/cadence v0.0.0-20190702231331-27b0ba2bc456/go.mod h1:CQivfHCJ44B1kKL4LLtOhcepUwNoRodZceo/wU5Nthw=
go.uber.org/cadence v0.9.1-0.20190819173509-c3cd9f8f9745 h1:UfkGg9w9lbfmLNxHMFp5UlCqzqw/kAZglCSAE2v6aeY=
go.uber.org/cadence v0.9.1-0.20190819173509-c3cd9f8f9745/go.mod h1:CQivfHCJ44B1kKL4LLtOhcepUwNoRodZceo/wU5Nthw=
go.uber.org/dig v1.7.0 h1:E5/L92iQTNJTjfgJF2KgU+/JpMaiuvK2DHLBj0+kSZk=
go.uber.org/dig v1.7.0/go.mod h1:z+dSd2TP9Usi48jL8M3v63iSBVkiwtVyMKxMZYYauPg=
go.uber.org/fx v1.9.0 h1:7OAz8ucp35AU8eydejpYG7QrbE8rLKzGhHbZlJi5LYY=
Expand Down
60 changes: 56 additions & 4 deletions host/queryworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ func (s *integrationSuite) TestQueryWorkflow_NonSticky() {
// activity handler
atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType,
activityID string, input []byte, taskToken []byte) ([]byte, bool, error) {

return []byte("Activity Result."), false, nil
}

Expand Down Expand Up @@ -449,7 +448,7 @@ func (s *integrationSuite) TestQueryWorkflow_NonSticky() {
Err error
}
queryResultCh := make(chan QueryResult)
queryWorkflowFn := func(queryType string) {
queryWorkflowFn := func(queryType string, rejectCondition *workflow.QueryRejectCondition) {
queryResp, err := s.engine.QueryWorkflow(createContext(), &workflow.QueryWorkflowRequest{
Domain: common.StringPtr(s.domainName),
Execution: &workflow.WorkflowExecution{
Expand All @@ -459,12 +458,13 @@ func (s *integrationSuite) TestQueryWorkflow_NonSticky() {
Query: &workflow.WorkflowQuery{
QueryType: common.StringPtr(queryType),
},
QueryRejectCondition: rejectCondition,
})
queryResultCh <- QueryResult{Resp: queryResp, Err: err}
}

// call QueryWorkflow in separate goroutinue (because it is blocking). That will generate a query task
go queryWorkflowFn(queryType)
go queryWorkflowFn(queryType, nil)
// process that query task, which should respond via RespondQueryTaskCompleted
for {
// loop until process the query task
Expand All @@ -479,10 +479,11 @@ func (s *integrationSuite) TestQueryWorkflow_NonSticky() {
s.NoError(queryResult.Err)
s.NotNil(queryResult.Resp)
s.NotNil(queryResult.Resp.QueryResult)
s.Nil(queryResult.Resp.QueryRejected)
queryResultString := string(queryResult.Resp.QueryResult)
s.Equal("query-result", queryResultString)

go queryWorkflowFn("invalid-query-type")
go queryWorkflowFn("invalid-query-type", nil)
for {
// loop until process the query task
isQueryTask, errInner := poller.PollAndProcessDecisionTask(false, false)
Expand All @@ -497,6 +498,57 @@ func (s *integrationSuite) TestQueryWorkflow_NonSticky() {
queryFailError, ok := queryResult.Err.(*workflow.QueryFailedError)
s.True(ok)
s.Equal("unknown-query-type", queryFailError.Message)

// advance the state of the decider
_, err = poller.PollAndProcessDecisionTask(false, false)
s.NoError(err)

go queryWorkflowFn(queryType, nil)
// process that query task, which should respond via RespondQueryTaskCompleted
for {
// loop until process the query task
isQueryTask, errInner := poller.PollAndProcessDecisionTask(false, false)
s.Logger.Info("PollAndProcessDecisionTask", tag.Error(err))
s.Nil(errInner)
if isQueryTask {
break
}
} // wait until query result is ready
queryResult = <-queryResultCh
s.NoError(queryResult.Err)
s.NotNil(queryResult.Resp)
s.NotNil(queryResult.Resp.QueryResult)
s.Nil(queryResult.Resp.QueryRejected)
queryResultString = string(queryResult.Resp.QueryResult)
s.Equal("query-result", queryResultString)

rejectCondition := workflow.QueryRejectConditionNotOpen
go queryWorkflowFn(queryType, &rejectCondition)
queryResult = <-queryResultCh
s.NoError(queryResult.Err)
s.NotNil(queryResult.Resp)
s.Nil(queryResult.Resp.QueryResult)
s.NotNil(queryResult.Resp.QueryRejected.CloseStatus)
s.Equal(workflow.WorkflowExecutionCloseStatusCompleted, *queryResult.Resp.QueryRejected.CloseStatus)

rejectCondition = workflow.QueryRejectConditionNotCompletedCleanly
go queryWorkflowFn(queryType, &rejectCondition)
for {
// loop until process the query task
isQueryTask, errInner := poller.PollAndProcessDecisionTask(false, false)
s.Logger.Info("PollAndProcessDecisionTask", tag.Error(err))
s.Nil(errInner)
if isQueryTask {
break
}
} // wait until query result is ready
queryResult = <-queryResultCh
s.NoError(queryResult.Err)
s.NotNil(queryResult.Resp)
s.NotNil(queryResult.Resp.QueryResult)
s.Nil(queryResult.Resp.QueryRejected)
queryResultString = string(queryResult.Resp.QueryResult)
s.Equal("query-result", queryResultString)
}

func (s *integrationSuite) TestQueryWorkflow_BeforeFirstDecision() {
Expand Down
15 changes: 15 additions & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2711,6 +2711,21 @@ func (wh *WorkflowHandler) QueryWorkflow(
if err != nil {
return nil, wh.error(err, scope)
}

// if workflow is closed and a rejection condition is given then check if query should be rejected before proceeding
if response.CloseStatus != nil && queryRequest.QueryRejectCondition != nil {
notOpenReject := queryRequest.GetQueryRejectCondition() == gen.QueryRejectConditionNotOpen
notCompletedCleanlyReject := queryRequest.GetQueryRejectCondition() == gen.QueryRejectConditionNotCompletedCleanly && response.GetCloseStatus() != shared.WorkflowExecutionCloseStatusCompleted
if notOpenReject || notCompletedCleanlyReject {
return &gen.QueryWorkflowResponse{
QueryResult: nil,
QueryRejected: &gen.QueryRejected{
CloseStatus: response.CloseStatus,
},
}, nil
}
}

clientFeature := client.NewFeatureImpl(
response.GetClientLibraryVersion(),
response.GetClientFeatureVersion(),
Expand Down
5 changes: 5 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,11 @@ func (e *historyEngineImpl) getMutableState(
BranchToken: msBuilder.GetCurrentBranch(),
}

if executionInfo.CloseStatus != persistence.WorkflowCloseStatusNone {
closeStatus := getWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
retResp.CloseStatus = &closeStatus
}

if msBuilder.IsStickyTaskListEnabled() {
retResp.StickyTaskList = &workflow.TaskList{Name: common.StringPtr(executionInfo.StickyTaskList)}
retResp.StickyTaskListScheduleToStartTimeout = common.Int32Ptr(executionInfo.StickyScheduleToStartTimeout)
Expand Down
6 changes: 6 additions & 0 deletions tools/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ const (
FlagOutputFormat = "output"
FlagQueryType = "query_type"
FlagQueryTypeWithAlias = FlagQueryType + ", qt"
FlagQueryRejectCondition = "query_reject_condition"
FlagQueryRejectConditionWithAlias = FlagQueryRejectCondition + ", qrc"
FlagShowDetail = "show_detail"
FlagShowDetailWithAlias = FlagShowDetail + ", sd"
FlagActiveClusterName = "active_cluster"
Expand Down Expand Up @@ -436,6 +438,10 @@ func getFlagsForQuery() []cli.Flag {
Usage: "Optional input for the query from JSON file. If there are multiple JSON, concatenate them and separate by space or newline. " +
"Input from file will be overwrite by input from command line",
},
cli.StringFlag{
Name: FlagQueryRejectConditionWithAlias,
Usage: "Optional flag to reject queries based on workflow state. Valid values are \"not_open\" and \"not_completed_cleanly\"",
},
}
}

Expand Down
20 changes: 18 additions & 2 deletions tools/cli/workflowCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,14 +507,30 @@ func queryWorkflowHelper(c *cli.Context, queryType string) {
if input != "" {
queryRequest.Query.QueryArgs = []byte(input)
}
if c.IsSet(FlagQueryRejectCondition) {
var rejectCondition s.QueryRejectCondition
switch c.String(FlagQueryRejectCondition) {
case "not_open":
rejectCondition = s.QueryRejectConditionNotOpen
case "not_completed_cleanly":
rejectCondition = s.QueryRejectConditionNotCompletedCleanly
default:
ErrorAndExit(fmt.Sprintf("invalid reject condition %v, valid values are \"not_open\" and \"not_completed_cleanly\"", c.String(FlagQueryRejectCondition)), nil)
}
queryRequest.QueryRejectCondition = &rejectCondition
}
queryResponse, err := serviceClient.QueryWorkflow(tcCtx, queryRequest)
if err != nil {
ErrorAndExit("Query workflow failed.", err)
return
}

// assume it is json encoded
fmt.Printf("Query result as JSON:\n%v\n", string(queryResponse.QueryResult))
if queryResponse.QueryRejected != nil {
fmt.Printf("Query was rejected, workflow is in state: %v\n", *queryResponse.QueryRejected.CloseStatus)
} else {
// assume it is json encoded
fmt.Printf("Query result as JSON:\n%v\n", string(queryResponse.QueryResult))
}
}

// ListWorkflow list workflow executions based on filters
Expand Down

0 comments on commit cc54edd

Please sign in to comment.