Skip to content

Commit

Permalink
Allow query task for non-active domains (cadence-workflow#1424)
Browse files Browse the repository at this point in the history
Cadence matching completely stops dispatching of both DecisionTask
and QueryTask in response to PollForDecisionTask API when domain
is marked as non-active.
We have separated out the task dispatch channel for query and
decisions, as we always want to allow sync match on queries even
if the domain is not active.
  • Loading branch information
samarabbas authored Jan 31, 2019
1 parent 5320df3 commit 954647e
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ THRIFTRW_SRCS = \
idl/github.com/uber/cadence/admin.thrift \

PROGS = cadence
TEST_ARG ?= -race -v -timeout 30m
TEST_ARG ?= -race -v -timeout 40m
BUILD := ./build
TOOLS_CMD_ROOT=./cmd/tools
INTEG_TEST_ROOT=./host
Expand Down
113 changes: 113 additions & 0 deletions hostxdc/Integration_domain_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,13 +440,25 @@ func (s *integrationClustersTestSuite) TestSimpleWorkflowFailover() {
return []byte("Activity Result."), false, nil
}

queryType := "test-query"
queryHandler := func(task *workflow.PollForDecisionTaskResponse) ([]byte, error) {
s.NotNil(task.Query)
s.NotNil(task.Query.QueryType)
if *task.Query.QueryType == queryType {
return []byte("query-result"), nil
}

return nil, errors.New("unknown-query-type")
}

poller := host.TaskPoller{
Engine: client1,
Domain: domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
ActivityHandler: atHandler,
QueryHandler: queryHandler,
Logger: s.logger,
T: s.T(),
}
Expand All @@ -458,6 +470,7 @@ func (s *integrationClustersTestSuite) TestSimpleWorkflowFailover() {
Identity: identity,
DecisionHandler: dtHandler,
ActivityHandler: atHandler,
QueryHandler: queryHandler,
Logger: s.logger,
T: s.T(),
}
Expand All @@ -467,6 +480,65 @@ func (s *integrationClustersTestSuite) TestSimpleWorkflowFailover() {
s.logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)

type QueryResult struct {
Resp *workflow.QueryWorkflowResponse
Err error
}
queryResultCh := make(chan QueryResult)
queryWorkflowFn := func(client wsc.Interface, queryType string) {
queryResp, err := client.QueryWorkflow(createContext(), &workflow.QueryWorkflowRequest{
Domain: common.StringPtr(domainName),
Execution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(*we.RunId),
},
Query: &workflow.WorkflowQuery{
QueryType: common.StringPtr(queryType),
},
})
queryResultCh <- QueryResult{Resp: queryResp, Err: err}
}

// call QueryWorkflow in separate goroutinue (because it is blocking). That will generate a query task
go queryWorkflowFn(client1, queryType)
// process that query task, which should respond via RespondQueryTaskCompleted
for {
// loop until process the query task
isQueryTask, errInner := poller.PollAndProcessDecisionTask(false, false)
s.logger.Infof("PollAndProcessQueryTask: %v", err)
s.Nil(errInner)
if isQueryTask {
break
}
}
// wait until query result is ready
queryResult := <-queryResultCh
s.NoError(queryResult.Err)
s.NotNil(queryResult.Resp)
s.NotNil(queryResult.Resp.QueryResult)
queryResultString := string(queryResult.Resp.QueryResult)
s.Equal("query-result", queryResultString)

// call QueryWorkflow in separate goroutinue (because it is blocking). That will generate a query task
go queryWorkflowFn(client2, queryType)
// process that query task, which should respond via RespondQueryTaskCompleted
for {
// loop until process the query task
isQueryTask, errInner := poller2.PollAndProcessDecisionTask(false, false)
s.logger.Infof("PollAndProcessQueryTask: %v", err)
s.Nil(errInner)
if isQueryTask {
break
}
}
// wait until query result is ready
queryResult = <-queryResultCh
s.NoError(queryResult.Err)
s.NotNil(queryResult.Resp)
s.NotNil(queryResult.Resp.QueryResult)
queryResultString = string(queryResult.Resp.QueryResult)
s.Equal("query-result", queryResultString)

// update domain to fail over
updateReq := &workflow.UpdateDomainRequest{
Name: common.StringPtr(domainName),
Expand Down Expand Up @@ -504,6 +576,47 @@ func (s *integrationClustersTestSuite) TestSimpleWorkflowFailover() {
s.Nil(err)
s.True(eventsReplicated)

// Make sure query is still working after failover
// call QueryWorkflow in separate goroutinue (because it is blocking). That will generate a query task
go queryWorkflowFn(client1, queryType)
// process that query task, which should respond via RespondQueryTaskCompleted
for {
// loop until process the query task
isQueryTask, errInner := poller.PollAndProcessDecisionTask(false, false)
s.logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(errInner)
if isQueryTask {
break
}
}
// wait until query result is ready
queryResult = <-queryResultCh
s.NoError(queryResult.Err)
s.NotNil(queryResult.Resp)
s.NotNil(queryResult.Resp.QueryResult)
queryResultString = string(queryResult.Resp.QueryResult)
s.Equal("query-result", queryResultString)

// call QueryWorkflow in separate goroutinue (because it is blocking). That will generate a query task
go queryWorkflowFn(client2, queryType)
// process that query task, which should respond via RespondQueryTaskCompleted
for {
// loop until process the query task
isQueryTask, errInner := poller2.PollAndProcessDecisionTask(false, false)
s.logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(errInner)
if isQueryTask {
break
}
}
// wait until query result is ready
queryResult = <-queryResultCh
s.NoError(queryResult.Err)
s.NotNil(queryResult.Resp)
s.NotNil(queryResult.Resp.QueryResult)
queryResultString = string(queryResult.Resp.QueryResult)
s.Equal("query-result", queryResultString)

// make process in cluster 2
err = poller2.PollAndProcessActivityTask(false)
s.logger.Infof("PollAndProcessActivityTask 2: %v", err)
Expand Down
15 changes: 13 additions & 2 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func newTaskListManagerWithRateLimiter(
metricsClient: e.metricsClient,
taskAckManager: newAckManager(e.logger),
tasksForPoll: make(chan *getTaskResult),
queryTasksForPoll: make(chan *getTaskResult),
config: config,
pollerHistory: newPollerHistory(),
outstandingPollsMap: make(map[string]context.CancelFunc),
Expand Down Expand Up @@ -280,7 +281,11 @@ type taskListManagerImpl struct {
// only if there is waiting poll that consumes from it. Tasks in taskBuffer will blocking-add to
// this channel
tasksForPoll chan *getTaskResult
notifyCh chan struct{} // Used as signal to notify pump of new tasks
// queryTasksForPoll is used for delivering query tasks to pollers.
// It must be unbuffered as query tasks are always Sync Matched. We use a separate channel for query tasks because
// unlike activity/decision tasks, query tasks are enabled for dispatch on both active and standby clusters
queryTasksForPoll chan *getTaskResult
notifyCh chan struct{} // Used as signal to notify pump of new tasks
// Note: We need two shutdown channels so we can stop task pump independently of the deliverBuffer
// loop in getTasksPump in unit tests
shutdownCh chan struct{} // Delivers stop to the pump that populates taskBuffer
Expand Down Expand Up @@ -404,7 +409,7 @@ func (c *taskListManagerImpl) SyncMatchQueryTask(ctx context.Context, queryTask

request := &getTaskResult{task: taskInfo, C: make(chan *syncMatchResponse, 1), queryTask: queryTask}
select {
case c.tasksForPoll <- request:
case c.queryTasksForPoll <- request:
<-request.C
return nil
case <-ctx.Done():
Expand Down Expand Up @@ -563,6 +568,12 @@ func (c *taskListManagerImpl) getTask(ctx context.Context) (*getTaskResult, erro
}
c.metricsClient.IncCounter(scope, metrics.PollSuccessCounter)
return result, nil
case result := <-c.queryTasksForPoll:
if result.syncMatch {
c.metricsClient.IncCounter(scope, metrics.PollSuccessWithSyncCounter)
}
c.metricsClient.IncCounter(scope, metrics.PollSuccessCounter)
return result, nil
case <-timer.C:
c.metricsClient.IncCounter(scope, metrics.PollTimeoutCounter)
return nil, ErrNoTasks
Expand Down

0 comments on commit 954647e

Please sign in to comment.