diff --git a/client/matching/client.go b/client/matching/client.go index d13f148488e..91fd8e4c303 100644 --- a/client/matching/client.go +++ b/client/matching/client.go @@ -110,11 +110,23 @@ func (c *clientImpl) PollForDecisionTask( } func (c *clientImpl) QueryWorkflow(ctx context.Context, queryRequest *m.QueryWorkflowRequest, opts ...yarpc.CallOption) (*workflow.QueryWorkflowResponse, error) { - return nil, &workflow.InternalServiceError{Message: "Not implemented yet"} + client, err := c.getHostForRequest(*queryRequest.TaskList.Name) + if err != nil { + return nil, err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.QueryWorkflow(ctx, queryRequest) } func (c *clientImpl) RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest, opts ...yarpc.CallOption) error { - return &workflow.InternalServiceError{Message: "Not implemented yet"} + client, err := c.getHostForRequest(*request.TaskList.Name) + if err != nil { + return err + } + ctx, cancel := c.createContext(ctx) + defer cancel() + return client.RespondQueryTaskCompleted(ctx, request) } func (c *clientImpl) getHostForRequest(key string) (matchingserviceclient.Interface, error) { diff --git a/common/jsonTaskTokenSerializer.go b/common/jsonTaskTokenSerializer.go index 93526375b84..6b01146cc91 100644 --- a/common/jsonTaskTokenSerializer.go +++ b/common/jsonTaskTokenSerializer.go @@ -43,3 +43,16 @@ func (j *jsonTaskTokenSerializer) Deserialize(data []byte) (*TaskToken, error) { return &token, err } + +func (j *jsonTaskTokenSerializer) SerializeQueryTaskToken(token *QueryTaskToken) ([]byte, error) { + data, err := json.Marshal(token) + + return data, err +} + +func (j *jsonTaskTokenSerializer) DeserializeQueryTaskToken(data []byte) (*QueryTaskToken, error) { + var token QueryTaskToken + err := json.Unmarshal(data, &token) + + return &token, err +} diff --git a/common/logging/events.go b/common/logging/events.go index d0c277cb4c2..dd6f8660c2a 100644 --- a/common/logging/events.go +++ b/common/logging/events.go @@ -74,6 +74,10 @@ const ( TaskListUnloaded = 5003 TaskListLoadingFailed = 5004 + // Query task events + InvalidQueryTaskEventID = 6000 + QueryTaskFailedEventID = 6001 + // General purpose events OperationFailed = 9000 OperationPanic = 9001 diff --git a/common/logging/helpers.go b/common/logging/helpers.go index 9f9f2e3794b..cb672c8aab7 100644 --- a/common/logging/helpers.go +++ b/common/logging/helpers.go @@ -331,3 +331,24 @@ func LogTaskListUnloadedEvent(logger bark.Logger) { TagWorkflowEventID: TaskListUnloaded, }).Info("Unloaded TaskList.") } + +// LogQueryTaskMissingWorkflowTypeErrorEvent is used to log invalid query task that is missing workflow type +func LogQueryTaskMissingWorkflowTypeErrorEvent(logger bark.Logger, workflowID, runID, queryType string) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: InvalidQueryTaskEventID, + "WorkflowID": workflowID, + "RunID": runID, + "QueryType": queryType, + }).Error("Cannot get WorkflowType for QueryTask.") +} + +// LogQueryTaskFailedEvent is used to log query task failure +func LogQueryTaskFailedEvent(logger bark.Logger, domain, workflowID, runID, queryType, errMsg string) { + logger.WithFields(bark.Fields{ + TagWorkflowEventID: QueryTaskFailedEventID, + "Domain": domain, + "WorkflowID": workflowID, + "RunID": runID, + "QueryType": queryType, + }).Info(errMsg) +} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index d15d27e4a2a..ee32db96059 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -217,6 +217,8 @@ const ( FrontendRecordActivityTaskHeartbeatScope // FrontendRespondDecisionTaskCompletedScope is the metric scope for frontend.RespondDecisionTaskCompleted FrontendRespondDecisionTaskCompletedScope + // FrontendRespondQueryTaskCompletedScope is the metric scope for frontend.RespondQueryTaskCompleted + FrontendRespondQueryTaskCompletedScope // FrontendRespondActivityTaskCompletedScope is the metric scope for frontend.RespondActivityTaskCompleted FrontendRespondActivityTaskCompletedScope // FrontendRespondActivityTaskFailedScope is the metric scope for frontend.RespondActivityTaskFailed @@ -243,6 +245,8 @@ const ( FrontendUpdateDomainScope // FrontendDeprecateDomainScope is the metric scope for frontend.DeprecateDomain FrontendDeprecateDomainScope + // FrontendQueryWorkflowScope is the metric scope for frontend.QueryWorkflow + FrontendQueryWorkflowScope NumFrontendScopes ) @@ -319,6 +323,10 @@ const ( MatchingAddDecisionTaskScope // MatchingTaskListMgrScope is the metrics scope for matching.TaskListManager component MatchingTaskListMgrScope + // MatchingQueryWorkflowScope tracks AddDecisionTask API calls received by service + MatchingQueryWorkflowScope + // MatchingRespondQueryTaskCompletedScope tracks AddDecisionTask API calls received by service + MatchingRespondQueryTaskCompletedScope NumMatchingScopes ) @@ -381,6 +389,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ FrontendPollForActivityTaskScope: {operation: "PollForActivityTask"}, FrontendRecordActivityTaskHeartbeatScope: {operation: "RecordActivityTaskHeartbeat"}, FrontendRespondDecisionTaskCompletedScope: {operation: "RespondDecisionTaskCompleted"}, + FrontendRespondQueryTaskCompletedScope: {operation: "RespondQueryTaskCompleted"}, FrontendRespondActivityTaskCompletedScope: {operation: "RespondActivityTaskCompleted"}, FrontendRespondActivityTaskFailedScope: {operation: "RespondActivityTaskFailed"}, FrontendRespondActivityTaskCanceledScope: {operation: "RespondActivityTaskCanceled"}, @@ -394,6 +403,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ FrontendDescribeDomainScope: {operation: "DescribeDomain"}, FrontendUpdateDomainScope: {operation: "UpdateDomain"}, FrontendDeprecateDomainScope: {operation: "DeprecateDomain"}, + FrontendQueryWorkflowScope: {operation: "QueryWorkflow"}, }, // History Scope Names History: { @@ -427,11 +437,13 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ }, // Matching Scope Names Matching: { - MatchingPollForDecisionTaskScope: {operation: "PollForDecisionTask"}, - MatchingPollForActivityTaskScope: {operation: "PollForActivityTask"}, - MatchingAddActivityTaskScope: {operation: "AddActivityTask"}, - MatchingAddDecisionTaskScope: {operation: "AddDecisionTask"}, - MatchingTaskListMgrScope: {operation: "TaskListMgr"}, + MatchingPollForDecisionTaskScope: {operation: "PollForDecisionTask"}, + MatchingPollForActivityTaskScope: {operation: "PollForActivityTask"}, + MatchingAddActivityTaskScope: {operation: "AddActivityTask"}, + MatchingAddDecisionTaskScope: {operation: "AddDecisionTask"}, + MatchingTaskListMgrScope: {operation: "TaskListMgr"}, + MatchingQueryWorkflowScope: {operation: "QueryWorkflow"}, + MatchingRespondQueryTaskCompletedScope: {operation: "RespondQueryTaskCompleted"}, }, } @@ -446,6 +458,7 @@ const ( CadenceErrExecutionAlreadyStartedCounter CadenceErrDomainAlreadyExistsCounter CadenceErrCancellationAlreadyRequestedCounter + CadenceErrQueryFailedCounter PersistenceRequests PersistenceFailures PersistenceLatency @@ -509,6 +522,7 @@ const ( LeaseRequestCounter LeaseFailureCounter ConditionFailedErrorCounter + RespondQueryTaskFailedCounter ) // MetricDefs record the metrics for all services @@ -574,13 +588,14 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ RemoveEngineForShardLatency: {metricName: "remove-engine-for-shard-latency", metricType: Timer}, }, Matching: { - PollSuccessCounter: {metricName: "poll.success"}, - PollTimeoutCounter: {metricName: "poll.timeouts"}, - PollErrorsCounter: {metricName: "poll.errors"}, - PollSuccessWithSyncCounter: {metricName: "poll.success.sync"}, - LeaseRequestCounter: {metricName: "lease.requests"}, - LeaseFailureCounter: {metricName: "lease.failures"}, - ConditionFailedErrorCounter: {metricName: "condition-failed-errors"}, + PollSuccessCounter: {metricName: "poll.success"}, + PollTimeoutCounter: {metricName: "poll.timeouts"}, + PollErrorsCounter: {metricName: "poll.errors"}, + PollSuccessWithSyncCounter: {metricName: "poll.success.sync"}, + LeaseRequestCounter: {metricName: "lease.requests"}, + LeaseFailureCounter: {metricName: "lease.failures"}, + ConditionFailedErrorCounter: {metricName: "condition-failed-errors"}, + RespondQueryTaskFailedCounter: {metricName: "respond-query-failed"}, }, } diff --git a/common/taskTokenSerializerInterfaces.go b/common/taskTokenSerializerInterfaces.go index c97882d1036..169ea084e97 100644 --- a/common/taskTokenSerializerInterfaces.go +++ b/common/taskTokenSerializerInterfaces.go @@ -25,6 +25,8 @@ type ( TaskTokenSerializer interface { Serialize(token *TaskToken) ([]byte, error) Deserialize(data []byte) (*TaskToken, error) + SerializeQueryTaskToken(token *QueryTaskToken) ([]byte, error) + DeserializeQueryTaskToken(data []byte) (*QueryTaskToken, error) } // TaskToken identifies a task @@ -34,4 +36,11 @@ type ( RunID string `json:"runId"` ScheduleID int64 `json:"scheduleId"` } + + // QueryTaskToken identifies a query task + QueryTaskToken struct { + DomainID string `json:"domainId"` + TaskList string `jaon:"taskList"` + TaskID string `jaon:"taskId"` + } ) diff --git a/host/integration_test.go b/host/integration_test.go index 6cc130aa432..529ad5858f0 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -38,6 +38,8 @@ import ( "encoding/binary" "strconv" + "errors" + wsc "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/client/frontend" @@ -75,6 +77,8 @@ type ( activityTaskHandler func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType, activityID string, startedEventID int64, input []byte, takeToken []byte) ([]byte, bool, error) + queryHandler func(task *workflow.PollForDecisionTaskResponse) ([]byte, error) + taskPoller struct { engine frontend.Client domain string @@ -82,6 +86,7 @@ type ( identity string decisionHandler decisionTaskHandler activityHandler activityTaskHandler + queryHandler queryHandler logger bark.Logger } ) @@ -538,6 +543,83 @@ retry: return matching.ErrNoTasks } +func (p *taskPoller) pollAndProcessQueryTask(dumpHistory bool, dropTask bool) error { +retry: + for attempt := 0; attempt < 5; attempt++ { + response, err1 := p.engine.PollForDecisionTask(createContext(), &workflow.PollForDecisionTaskRequest{ + Domain: common.StringPtr(p.domain), + TaskList: p.taskList, + Identity: common.StringPtr(p.identity), + }) + + if err1 == history.ErrDuplicate { + p.logger.Info("Duplicate Decision task: Polling again.") + continue retry + } + + if err1 != nil { + return err1 + } + + if response == nil || len(response.TaskToken) == 0 { + p.logger.Info("Empty Decision task: Polling again.") + continue retry + } + + history := response.History + if history == nil { + p.logger.Fatal("History is nil") + } + + events := history.Events + if events == nil || len(events) == 0 { + p.logger.Fatalf("History Events are empty: %v", events) + } + + nextPageToken := response.NextPageToken + for nextPageToken != nil { + resp, err2 := p.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{ + Domain: common.StringPtr(p.domain), + Execution: response.WorkflowExecution, + NextPageToken: nextPageToken, + }) + + if err2 != nil { + return err2 + } + + events = append(events, resp.History.Events...) + nextPageToken = resp.NextPageToken + } + + if dropTask { + p.logger.Info("Dropping Decision task: ") + return nil + } + + if dumpHistory { + common.PrettyPrintHistory(response.History, p.logger) + } + + blob, err := p.queryHandler(response) + + completeRequest := &workflow.RespondQueryTaskCompletedRequest{TaskToken: response.TaskToken} + if err != nil { + completeType := workflow.QueryTaskCompletedTypeFailed + completeRequest.CompletedType = &completeType + completeRequest.ErrorMessage = common.StringPtr(err.Error()) + } else { + completeType := workflow.QueryTaskCompletedTypeCompleted + completeRequest.CompletedType = &completeType + completeRequest.QueryResult = blob + } + + return p.engine.RespondQueryTaskCompleted(createContext(), completeRequest) + } + + return matching.ErrNoTasks +} + func (p *taskPoller) pollAndProcessActivityTask(dropTask bool) error { retry: for attempt := 0; attempt < 5; attempt++ { @@ -1300,6 +1382,155 @@ func (s *integrationSuite) TestSignalWorkflow() { s.IsType(&workflow.EntityNotExistsError{}, err) } +func (s *integrationSuite) TestQueryWorkflow() { + id := "interation-query-workflow-test" + wt := "interation-query-workflow-test-type" + tl := "interation-query-workflow-test-tasklist" + identity := "worker1" + activityName := "activity_type1" + queryType := "test-query" + + workflowType := &workflow.WorkflowType{} + workflowType.Name = common.StringPtr(wt) + + taskList := &workflow.TaskList{} + taskList.Name = common.StringPtr(tl) + + // Start workflow execution + request := &workflow.StartWorkflowExecutionRequest{ + RequestId: common.StringPtr(uuid.New()), + Domain: common.StringPtr(s.domainName), + WorkflowId: common.StringPtr(id), + WorkflowType: workflowType, + TaskList: taskList, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + Identity: common.StringPtr(identity), + } + + we, err0 := s.engine.StartWorkflowExecution(createContext(), request) + s.Nil(err0) + + s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) + + // decider logic + workflowComplete := false + activityScheduled := false + activityData := int32(1) + var signalEvent *workflow.HistoryEvent + dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision) { + + if !activityScheduled { + activityScheduled = true + buf := new(bytes.Buffer) + s.Nil(binary.Write(buf, binary.LittleEndian, activityData)) + + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr(strconv.Itoa(int(1))), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: buf.Bytes(), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(100), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(2), + StartToCloseTimeoutSeconds: common.Int32Ptr(50), + HeartbeatTimeoutSeconds: common.Int32Ptr(5), + }, + }} + } else if previousStartedEventID > 0 { + for _, event := range history.Events[previousStartedEventID:] { + if *event.EventType == workflow.EventTypeWorkflowExecutionSignaled { + signalEvent = event + return nil, []*workflow.Decision{} + } + } + } + + workflowComplete = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), + CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ + Result: []byte("Done."), + }, + }} + } + + // activity handler + atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType, + activityID string, startedEventID int64, input []byte, taskToken []byte) ([]byte, bool, error) { + + return []byte("Activity Result."), false, nil + } + + queryHandler := func(task *workflow.PollForDecisionTaskResponse) ([]byte, error) { + s.NotNil(task.Query) + s.NotNil(task.Query.QueryType) + if *task.Query.QueryType == queryType { + return []byte("query-result"), nil + } + + return nil, errors.New("unknown-query-type") + } + + poller := &taskPoller{ + engine: s.engine, + domain: s.domainName, + taskList: taskList, + identity: identity, + decisionHandler: dtHandler, + activityHandler: atHandler, + queryHandler: queryHandler, + logger: s.logger, + } + + // Make first decision to schedule activity + err := poller.pollAndProcessDecisionTask(false, false) + s.logger.Infof("pollAndProcessDecisionTask: %v", err) + s.Nil(err) + + type QueryResult struct { + Resp *workflow.QueryWorkflowResponse + Err error + } + queryResultCh := make(chan QueryResult) + queryWorkflowFn := func(queryType string) { + queryResp, err := s.engine.QueryWorkflow(createContext(), &workflow.QueryWorkflowRequest{ + Domain: common.StringPtr(s.domainName), + Execution: &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(id), + RunId: common.StringPtr(*we.RunId), + }, + Query: &workflow.WorkflowQuery{ + QueryType: common.StringPtr(queryType), + }, + }) + queryResultCh <- QueryResult{Resp: queryResp, Err: err} + } + + // call QueryWorkflow in separate goroutinue (because it is blocking). That will generate a query task + go queryWorkflowFn(queryType) + // process that query task, which should respond via RespondQueryTaskCompleted + err = poller.pollAndProcessQueryTask(false, false) + // wait until query result is ready + queryResult := <-queryResultCh + s.NoError(queryResult.Err) + s.NotNil(queryResult.Resp) + s.NotNil(queryResult.Resp.QueryResult) + queryResultString := string(queryResult.Resp.QueryResult) + s.Equal("query-result", queryResultString) + + go queryWorkflowFn("invalid-query-type") + err = poller.pollAndProcessQueryTask(false, false) + queryResult = <-queryResultCh + s.NotNil(queryResult.Err) + queryFailError, ok := queryResult.Err.(*workflow.QueryFailedError) + s.True(ok) + s.Equal("unknown-query-type", queryFailError.Message) +} + func (s *integrationSuite) TestContinueAsNewWorkflow() { id := "interation-continue-as-new-workflow-test" wt := "interation-continue-as-new-workflow-test-type" diff --git a/service/frontend/handler.go b/service/frontend/handler.go index 93e019e37fa..75347b75463 100644 --- a/service/frontend/handler.go +++ b/service/frontend/handler.go @@ -23,9 +23,12 @@ package frontend import ( "context" "encoding/json" + "errors" "sync" "github.com/pborman/uuid" + "github.com/uber-common/bark" + "github.com/uber-go/tally" "github.com/uber/cadence/.gen/go/cadence/workflowserviceserver" "github.com/uber/cadence/.gen/go/health" "github.com/uber/cadence/.gen/go/health/metaserver" @@ -36,12 +39,10 @@ import ( "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service" - - "github.com/uber-common/bark" - "github.com/uber-go/tally" ) var _ workflowserviceserver.Interface = (*WorkflowHandler)(nil) @@ -74,6 +75,7 @@ type ( var ( errDomainNotSet = &gen.BadRequestError{Message: "Domain not set on request."} errTaskTokenNotSet = &gen.BadRequestError{Message: "Task token not set on request."} + errInvalidTaskToken = &gen.BadRequestError{Message: "Invalid TaskToken."} errTaskListNotSet = &gen.BadRequestError{Message: "TaskList is not set on request."} errExecutionNotSet = &gen.BadRequestError{Message: "Execution is not set on request."} errWorkflowIDNotSet = &gen.BadRequestError{Message: "WorkflowId is not set on request."} @@ -81,6 +83,8 @@ var ( errInvalidRunID = &gen.BadRequestError{Message: "Invalid RunId."} errInvalidNextPageToken = &gen.BadRequestError{Message: "Invalid NextPageToken."} errNextPageTokenRunIDMismatch = &gen.BadRequestError{Message: "RunID in the request does not match the NextPageToken."} + errQueryNotSet = &gen.BadRequestError{Message: "WorkflowQuery is not set on request."} + errQueryTypeNotSet = &gen.BadRequestError{Message: "QueryType is not set on request."} ) // NewWorkflowHandler creates a thrift handler for the cadence service @@ -400,7 +404,7 @@ func (wh *WorkflowHandler) PollForDecisionTask( } } - return createPollForDecisionTaskResponse(matchingResp, history, continuation), nil + return wh.createPollForDecisionTaskResponse(ctx, matchingResp, history, continuation), nil } // RecordActivityTaskHeartbeat - Record Activity Task Heart beat. @@ -583,7 +587,37 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted( func (wh *WorkflowHandler) RespondQueryTaskCompleted( ctx context.Context, completeRequest *gen.RespondQueryTaskCompletedRequest) error { - return &gen.InternalServiceError{Message: "Not implemented yet"} + + scope := metrics.FrontendRespondQueryTaskCompletedScope + sw := wh.startRequestProfile(scope) + defer sw.Stop() + + // Count the request in the RPS, but we still accept it even if RPS is exceeded + wh.rateLimiter.TryConsume(1) + + if completeRequest.TaskToken == nil { + return wh.error(errTaskTokenNotSet, scope) + } + queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(completeRequest.TaskToken) + if err != nil { + return wh.error(err, scope) + } + if queryTaskToken.DomainID == "" || queryTaskToken.TaskList == "" || queryTaskToken.TaskID == "" { + return wh.error(errInvalidTaskToken, scope) + } + + matchingRequest := &m.RespondQueryTaskCompletedRequest{ + DomainUUID: common.StringPtr(queryTaskToken.DomainID), + TaskList: &gen.TaskList{Name: common.StringPtr(queryTaskToken.TaskList)}, + TaskID: common.StringPtr(queryTaskToken.TaskID), + CompletedRequest: completeRequest, + } + + err = wh.matching.RespondQueryTaskCompleted(ctx, matchingRequest) + if err != nil { + return wh.error(err, scope) + } + return nil } // StartWorkflowExecution - Creates a new workflow execution @@ -1022,7 +1056,81 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context, // QueryWorkflow returns query result for a specified workflow execution func (wh *WorkflowHandler) QueryWorkflow(ctx context.Context, queryRequest *gen.QueryWorkflowRequest) (*gen.QueryWorkflowResponse, error) { - return nil, &gen.InternalServiceError{Message: "Not implemented."} + + scope := metrics.FrontendQueryWorkflowScope + sw := wh.startRequestProfile(scope) + defer sw.Stop() + + if queryRequest.Domain == nil { + return nil, wh.error(errDomainNotSet, scope) + } + + if queryRequest.Execution == nil { + return nil, wh.error(errExecutionNotSet, scope) + } + + if queryRequest.Execution.WorkflowId == nil { + return nil, wh.error(errWorkflowIDNotSet, scope) + } + + if queryRequest.Execution.RunId != nil && uuid.Parse(*queryRequest.Execution.RunId) == nil { + return nil, wh.error(errInvalidRunID, scope) + } + + if queryRequest.Query == nil { + return nil, wh.error(errQueryNotSet, scope) + } + + if queryRequest.Query.QueryType == nil { + return nil, wh.error(errQueryTypeNotSet, scope) + } + + domainInfo, _, err := wh.domainCache.GetDomain(queryRequest.GetDomain()) + if err != nil { + return nil, wh.error(err, scope) + } + + matchingRequest := &m.QueryWorkflowRequest{ + DomainUUID: common.StringPtr(domainInfo.ID), + QueryRequest: queryRequest, + } + if queryRequest.Execution.RunId == nil { + // RunID is not set, we would use the running one if it exists. + response, err := wh.history.GetWorkflowExecutionNextEventID(ctx, &h.GetWorkflowExecutionNextEventIDRequest{ + DomainUUID: common.StringPtr(domainInfo.ID), + Execution: queryRequest.Execution, + }) + if err != nil { + return nil, wh.error(err, scope) + } + + queryRequest.Execution.RunId = response.RunId + matchingRequest.TaskList = response.Tasklist + } else { + // Get the TaskList from history (first event) + history, _, err := wh.getHistory(domainInfo.ID, *queryRequest.Execution, 2, 1, nil) + if err != nil { + return nil, wh.error(err, scope) + } + if len(history.Events) == 0 || history.Events[0].GetEventType() != gen.EventTypeWorkflowExecutionStarted { + // this should not happen + return nil, wh.error(errors.New("invalid history events"), scope) + } + matchingRequest.TaskList = history.Events[0].WorkflowExecutionStartedEventAttributes.TaskList + } + + matchingResp, err := wh.matching.QueryWorkflow(ctx, matchingRequest) + if err != nil { + logging.LogQueryTaskFailedEvent(wh.GetLogger(), + *queryRequest.Domain, + *queryRequest.Execution.WorkflowId, + *queryRequest.Execution.RunId, + *queryRequest.Query.QueryType, + err.Error()) + return nil, wh.error(err, scope) + } + + return matchingResp, nil } func (wh *WorkflowHandler) getHistory(domainID string, execution gen.WorkflowExecution, @@ -1118,6 +1226,9 @@ func (wh *WorkflowHandler) error(err error, scope int) error { case *gen.CancellationAlreadyRequestedError: wh.metricsClient.IncCounter(scope, metrics.CadenceErrCancellationAlreadyRequestedCounter) return err + case *gen.QueryFailedError: + wh.metricsClient.IncCounter(scope, metrics.CadenceErrQueryFailedCounter) + return err default: wh.metricsClient.IncCounter(scope, metrics.CadenceFailures) return &gen.InternalServiceError{Message: err.Error()} @@ -1176,7 +1287,7 @@ func createDomainResponse(info *persistence.DomainInfo, config *persistence.Doma return i, c } -func createPollForDecisionTaskResponse( +func (wh *WorkflowHandler) createPollForDecisionTaskResponse(ctx context.Context, matchingResponse *m.PollForDecisionTaskResponse, history *gen.History, nextPageToken []byte) *gen.PollForDecisionTaskResponse { resp := &gen.PollForDecisionTaskResponse{} if matchingResponse != nil { @@ -1185,7 +1296,41 @@ func createPollForDecisionTaskResponse( resp.WorkflowType = matchingResponse.WorkflowType resp.PreviousStartedEventId = matchingResponse.PreviousStartedEventId resp.StartedEventId = matchingResponse.StartedEventId + resp.Query = matchingResponse.Query } + + if matchingResponse.Query != nil && resp.WorkflowType == nil { + // for query task, the matching engine was not able to populate the WorkflowType, so set here from history + if history != nil && len(history.Events) > 0 && + history.Events[0].WorkflowExecutionStartedEventAttributes != nil { + resp.WorkflowType = history.Events[0].WorkflowExecutionStartedEventAttributes.WorkflowType + } else { + // this should never happen, but if it does happen, log it and respond error back to query client. + logging.LogQueryTaskMissingWorkflowTypeErrorEvent(wh.GetLogger(), + *matchingResponse.WorkflowExecution.WorkflowId, + *matchingResponse.WorkflowExecution.RunId, + *resp.Query.QueryType) + + queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(matchingResponse.TaskToken) + if err == nil { + completeType := gen.QueryTaskCompletedTypeFailed + wh.matching.RespondQueryTaskCompleted(ctx, &m.RespondQueryTaskCompletedRequest{ + DomainUUID: common.StringPtr(queryTaskToken.DomainID), + TaskList: &gen.TaskList{Name: common.StringPtr(queryTaskToken.TaskList)}, + TaskID: common.StringPtr(queryTaskToken.TaskID), + CompletedRequest: &gen.RespondQueryTaskCompletedRequest{ + TaskToken: matchingResponse.TaskToken, + CompletedType: &completeType, + ErrorMessage: common.StringPtr("server internal error: cannot get WorkflowType for QueryTask"), + }, + }) + } + + // in this case, just return empty response for the pool request and client will just ignore + return &gen.PollForDecisionTaskResponse{} + } + } + resp.History = history resp.NextPageToken = nextPageToken return resp diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index dea72e1a6b9..cde73b1b09b 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -309,6 +309,7 @@ func (e *historyEngineImpl) GetWorkflowExecutionNextEventID( result := &h.GetWorkflowExecutionNextEventIDResponse{} result.EventId = common.Int64Ptr(msBuilder.GetNextEventID()) result.RunId = context.workflowExecution.RunId + result.Tasklist = &workflow.TaskList{Name: common.StringPtr(context.msBuilder.executionInfo.TaskList)} return result, nil } diff --git a/service/matching/handler.go b/service/matching/handler.go index 925d5cc5e31..6569702c12b 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -121,10 +121,8 @@ func (h *Handler) PollForActivityTask(ctx context.Context, sw := h.startRequestProfile("PollForActivityTask", scope) defer sw.Stop() - response, error := h.engine.PollForActivityTask(ctx, pollRequest) - h.Service.GetLogger().Debug("Engine returned from PollForActivityTask") - return response, h.handleErr(error, scope) - + response, err := h.engine.PollForActivityTask(ctx, pollRequest) + return response, h.handleErr(err, scope) } // PollForDecisionTask - long poll for a decision task. @@ -135,20 +133,29 @@ func (h *Handler) PollForDecisionTask(ctx context.Context, sw := h.startRequestProfile("PollForDecisionTask", scope) defer sw.Stop() - response, error := h.engine.PollForDecisionTask(ctx, pollRequest) - h.Service.GetLogger().Debug("Engine returned from PollForDecisionTask") - return response, h.handleErr(error, scope) + response, err := h.engine.PollForDecisionTask(ctx, pollRequest) + return response, h.handleErr(err, scope) } // QueryWorkflow queries a given workflow synchronously and return the query result. func (h *Handler) QueryWorkflow(ctx context.Context, queryRequest *m.QueryWorkflowRequest) (*gen.QueryWorkflowResponse, error) { - return nil, &gen.InternalServiceError{Message: "Not implemented yet"} + scope := metrics.MatchingQueryWorkflowScope + sw := h.startRequestProfile("QueryWorkflow", scope) + defer sw.Stop() + + response, err := h.engine.QueryWorkflow(ctx, queryRequest) + return response, h.handleErr(err, scope) } // RespondQueryTaskCompleted responds a query task completed func (h *Handler) RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest) error { - return &gen.InternalServiceError{Message: "Not implemented yet"} + scope := metrics.MatchingRespondQueryTaskCompletedScope + sw := h.startRequestProfile("RespondQueryTaskCompleted", scope) + defer sw.Stop() + + err := h.engine.RespondQueryTaskCompleted(ctx, request) + return h.handleErr(err, scope) } func (h *Handler) handleErr(err error, scope int) error { @@ -173,6 +180,9 @@ func (h *Handler) handleErr(err error, scope int) error { case *gen.DomainAlreadyExistsError: h.metricsClient.IncCounter(scope, metrics.CadenceErrDomainAlreadyExistsCounter) return err + case *gen.QueryFailedError: + h.metricsClient.IncCounter(scope, metrics.CadenceErrQueryFailedCounter) + return err default: h.metricsClient.IncCounter(scope, metrics.CadenceFailures) return &gen.InternalServiceError{Message: err.Error()} diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index d675429af02..e9fd056e8b2 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -23,13 +23,11 @@ package matching import ( "context" "errors" + "math" "sync" "github.com/pborman/uuid" "github.com/uber-common/bark" - - "math" - h "github.com/uber/cadence/.gen/go/history" m "github.com/uber/cadence/.gen/go/matching" workflow "github.com/uber/cadence/.gen/go/shared" @@ -52,6 +50,11 @@ type matchingEngineImpl struct { taskListsLock sync.RWMutex // locks mutation of taskLists taskLists map[taskListID]taskListManager // Convert to LRU cache config *Config + queryMapLock sync.Mutex + // map from query TaskID (which is a UUID generated in QueryWorkflow() call) to a channel that QueryWorkflow() + // will block and wait for. The RespondQueryTaskCompleted() call will send the data through that channel which will + // unblock QueryWorkflow() call. + queryTaskMap map[string]chan *workflow.RespondQueryTaskCompletedRequest } type taskListID struct { @@ -105,6 +108,7 @@ func NewEngine(taskManager persistence.TaskManager, }), metricsClient: metricsClient, config: config, + queryTaskMap: make(map[string]chan *workflow.RespondQueryTaskCompletedRequest), } } @@ -242,6 +246,35 @@ pollLoop: return nil, err } + if tCtx.queryTaskInfo != nil { + // for query task, we don't need to update history to record decision task started. but we need to know + // the NextEventID so front end knows what are the history events to load for this decision task. + nextIDResp, err := e.historyService.GetWorkflowExecutionNextEventID(ctx, &h.GetWorkflowExecutionNextEventIDRequest{ + DomainUUID: req.DomainUUID, + Execution: &tCtx.workflowExecution, + }) + if err != nil { + // will notify query client that the query task failed + completeType := workflow.QueryTaskCompletedTypeFailed + e.RespondQueryTaskCompleted(ctx, &m.RespondQueryTaskCompletedRequest{ + TaskID: common.StringPtr(tCtx.queryTaskInfo.taskID), + CompletedRequest: &workflow.RespondQueryTaskCompletedRequest{ + CompletedType: &completeType, + ErrorMessage: common.StringPtr("server internal error: failed to get nextID " + err.Error()), + }, + }) + return emptyPollForDecisionTaskResponse, nil + } + + var lastEventID = *nextIDResp.EventId - 1 + resp := &h.RecordDecisionTaskStartedResponse{ + PreviousStartedEventId: &lastEventID, + StartedEventId: &lastEventID, + } + tCtx.completeTask(nil) + return e.createPollForDecisionTaskResponse(tCtx, resp), nil + } + // Generate a unique requestId for this task which will be used for all retries requestID := uuid.New() resp, err := tCtx.RecordDecisionTaskStartedWithRetry(&h.RecordDecisionTaskStartedRequest{ @@ -321,6 +354,60 @@ pollLoop: } } +// QueryWorkflow creates a DecisionTask with query data, send it through sync match channel, wait for that DecisionTask +// to be processed by worker, and then return the query result. +func (e *matchingEngineImpl) QueryWorkflow(ctx context.Context, queryRequest *m.QueryWorkflowRequest) (*workflow.QueryWorkflowResponse, error) { + domainID := queryRequest.GetDomainUUID() + taskListName := *queryRequest.TaskList.Name + taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision) + tlMgr, err := e.getTaskListManager(taskList) + if err != nil { + return nil, err + } + queryTask := &queryTaskInfo{ + queryRequest: queryRequest, + taskID: uuid.New(), + } + err = tlMgr.SyncMatchQueryTask(ctx, queryTask) + if err != nil { + return nil, err + } + + queryResultCh := make(chan *workflow.RespondQueryTaskCompletedRequest, 1) + e.queryMapLock.Lock() + e.queryTaskMap[queryTask.taskID] = queryResultCh + e.queryMapLock.Unlock() + defer func() { + e.queryMapLock.Lock() + delete(e.queryTaskMap, queryTask.taskID) + e.queryMapLock.Unlock() + }() + + select { + case result := <-queryResultCh: + if *result.CompletedType == workflow.QueryTaskCompletedTypeFailed { + return nil, &workflow.QueryFailedError{Message: result.GetErrorMessage()} + } + return &workflow.QueryWorkflowResponse{QueryResult: result.QueryResult}, nil + case <-ctx.Done(): + return nil, &workflow.QueryFailedError{Message: "timeout: workflow worker is not responding"} + } +} + +func (e *matchingEngineImpl) RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest) error { + e.queryMapLock.Lock() + queryResultCh, ok := e.queryTaskMap[request.GetTaskID()] + e.queryMapLock.Unlock() + if !ok { + e.metricsClient.IncCounter(metrics.MatchingRespondQueryTaskCompletedScope, metrics.RespondQueryTaskFailedCounter) + return &workflow.EntityNotExistsError{Message: "query task not found, or already expired"} + } + + queryResultCh <- request.CompletedRequest + + return nil +} + // Loads a task from persistence and wraps it in a task context func (e *matchingEngineImpl) getTask(ctx context.Context, taskList *taskListID) (*taskContext, error) { tlMgr, err := e.getTaskListManager(taskList) @@ -349,16 +436,27 @@ func (e *matchingEngineImpl) createPollForDecisionTaskResponse(context *taskCont response := &m.PollForDecisionTaskResponse{} response.WorkflowExecution = workflowExecutionPtr(context.workflowExecution) - token := &common.TaskToken{ - DomainID: task.DomainID, - WorkflowID: task.WorkflowID, - RunID: task.RunID, - ScheduleID: task.ScheduleID, + if context.queryTaskInfo != nil { + // for a query task + queryRequest := context.queryTaskInfo.queryRequest + token := &common.QueryTaskToken{ + DomainID: *queryRequest.DomainUUID, + TaskList: *queryRequest.TaskList.Name, + TaskID: context.queryTaskInfo.taskID, + } + response.TaskToken, _ = e.tokenSerializer.SerializeQueryTaskToken(token) + response.Query = context.queryTaskInfo.queryRequest.QueryRequest.Query + } else { + token := &common.TaskToken{ + DomainID: task.DomainID, + WorkflowID: task.WorkflowID, + RunID: task.RunID, + ScheduleID: task.ScheduleID, + } + response.TaskToken, _ = e.tokenSerializer.Serialize(token) + response.WorkflowType = historyResponse.WorkflowType } - response.TaskToken, _ = e.tokenSerializer.Serialize(token) - response.WorkflowType = historyResponse.WorkflowType - if historyResponse.PreviousStartedEventId == nil || - *historyResponse.PreviousStartedEventId != common.EmptyEventID { + if historyResponse.GetPreviousStartedEventId() != common.EmptyEventID { response.PreviousStartedEventId = historyResponse.PreviousStartedEventId } response.StartedEventId = historyResponse.StartedEventId diff --git a/service/matching/matchingEngineInterfaces.go b/service/matching/matchingEngineInterfaces.go index 66c990b07cb..c053502e81f 100644 --- a/service/matching/matchingEngineInterfaces.go +++ b/service/matching/matchingEngineInterfaces.go @@ -22,6 +22,7 @@ package matching import ( "context" + m "github.com/uber/cadence/.gen/go/matching" workflow "github.com/uber/cadence/.gen/go/shared" ) @@ -34,5 +35,7 @@ type ( AddActivityTask(addRequest *m.AddActivityTaskRequest) error PollForDecisionTask(ctx context.Context, request *m.PollForDecisionTaskRequest) (*m.PollForDecisionTaskResponse, error) PollForActivityTask(ctx context.Context, request *m.PollForActivityTaskRequest) (*workflow.PollForActivityTaskResponse, error) + QueryWorkflow(ctx context.Context, request *m.QueryWorkflowRequest) (*workflow.QueryWorkflowResponse, error) + RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest) error } ) diff --git a/service/matching/taskListManager.go b/service/matching/taskListManager.go index 8c721529d37..6fd7428e4dc 100644 --- a/service/matching/taskListManager.go +++ b/service/matching/taskListManager.go @@ -28,6 +28,7 @@ import ( "github.com/uber-common/bark" h "github.com/uber/cadence/.gen/go/history" + m "github.com/uber/cadence/.gen/go/matching" s "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" @@ -46,6 +47,7 @@ type taskListManager interface { Stop() AddTask(execution *s.WorkflowExecution, taskInfo *persistence.TaskInfo) error GetTaskContext(ctx context.Context) (*taskContext, error) + SyncMatchQueryTask(ctx context.Context, queryTask *queryTaskInfo) error String() string } @@ -79,6 +81,12 @@ type taskContext struct { info *persistence.TaskInfo syncResponseCh chan<- *syncMatchResponse workflowExecution s.WorkflowExecution + queryTaskInfo *queryTaskInfo +} + +type queryTaskInfo struct { + taskID string + queryRequest *m.QueryWorkflowRequest } // Single task list in memory state @@ -113,8 +121,9 @@ type taskListManagerImpl struct { // getTaskResult contains task info and optional channel to notify createTask caller // that task is successfully started and returned to a poller type getTaskResult struct { - task *persistence.TaskInfo - C chan *syncMatchResponse + task *persistence.TaskInfo + C chan *syncMatchResponse + queryTask *queryTaskInfo } // syncMatchResponse result of sync match delivered to a createTask caller @@ -170,6 +179,27 @@ func (c *taskListManagerImpl) AddTask(execution *s.WorkflowExecution, taskInfo * return err } +func (c *taskListManagerImpl) SyncMatchQueryTask(ctx context.Context, queryTask *queryTaskInfo) error { + c.startWG.Wait() + + domainID := queryTask.queryRequest.GetDomainUUID() + we := queryTask.queryRequest.QueryRequest.Execution + taskInfo := &persistence.TaskInfo{ + DomainID: domainID, + RunID: we.GetRunId(), + WorkflowID: we.GetWorkflowId(), + } + + request := &getTaskResult{task: taskInfo, C: make(chan *syncMatchResponse, 1), queryTask: queryTask} + select { + case c.syncMatch <- request: + <-request.C + return nil + case <-ctx.Done(): + return &s.QueryFailedError{Message: "timeout: no workflow worker polling for given tasklist"} + } +} + // Loads a task from DB or from sync match and wraps it in a task context func (c *taskListManagerImpl) GetTaskContext(ctx context.Context) (*taskContext, error) { result, err := c.getTask(ctx) @@ -185,7 +215,8 @@ func (c *taskListManagerImpl) GetTaskContext(ctx context.Context) (*taskContext, info: task, workflowExecution: workflowExecution, tlMgr: c, - syncResponseCh: result.C, // nil if task is loaded from persistence + syncResponseCh: result.C, // nil if task is loaded from persistence + queryTaskInfo: result.queryTask, // non-nil for query task } return tCtx, nil }