Skip to content

Commit

Permalink
query API server side implementation (cadence-workflow#347)
Browse files Browse the repository at this point in the history
* query impl

* avoid using error as variable name

* address comments

* comments

* address comments & adding integration test
  • Loading branch information
yiminc authored Sep 16, 2017
1 parent a4b6171 commit 8f5a16f
Show file tree
Hide file tree
Showing 13 changed files with 638 additions and 45 deletions.
16 changes: 14 additions & 2 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,23 @@ func (c *clientImpl) PollForDecisionTask(
}

func (c *clientImpl) QueryWorkflow(ctx context.Context, queryRequest *m.QueryWorkflowRequest, opts ...yarpc.CallOption) (*workflow.QueryWorkflowResponse, error) {
return nil, &workflow.InternalServiceError{Message: "Not implemented yet"}
client, err := c.getHostForRequest(*queryRequest.TaskList.Name)
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.QueryWorkflow(ctx, queryRequest)
}

func (c *clientImpl) RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest, opts ...yarpc.CallOption) error {
return &workflow.InternalServiceError{Message: "Not implemented yet"}
client, err := c.getHostForRequest(*request.TaskList.Name)
if err != nil {
return err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.RespondQueryTaskCompleted(ctx, request)
}

func (c *clientImpl) getHostForRequest(key string) (matchingserviceclient.Interface, error) {
Expand Down
13 changes: 13 additions & 0 deletions common/jsonTaskTokenSerializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,16 @@ func (j *jsonTaskTokenSerializer) Deserialize(data []byte) (*TaskToken, error) {

return &token, err
}

func (j *jsonTaskTokenSerializer) SerializeQueryTaskToken(token *QueryTaskToken) ([]byte, error) {
data, err := json.Marshal(token)

return data, err
}

func (j *jsonTaskTokenSerializer) DeserializeQueryTaskToken(data []byte) (*QueryTaskToken, error) {
var token QueryTaskToken
err := json.Unmarshal(data, &token)

return &token, err
}
4 changes: 4 additions & 0 deletions common/logging/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ const (
TaskListUnloaded = 5003
TaskListLoadingFailed = 5004

// Query task events
InvalidQueryTaskEventID = 6000
QueryTaskFailedEventID = 6001

// General purpose events
OperationFailed = 9000
OperationPanic = 9001
Expand Down
21 changes: 21 additions & 0 deletions common/logging/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,24 @@ func LogTaskListUnloadedEvent(logger bark.Logger) {
TagWorkflowEventID: TaskListUnloaded,
}).Info("Unloaded TaskList.")
}

// LogQueryTaskMissingWorkflowTypeErrorEvent is used to log invalid query task that is missing workflow type
func LogQueryTaskMissingWorkflowTypeErrorEvent(logger bark.Logger, workflowID, runID, queryType string) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: InvalidQueryTaskEventID,
"WorkflowID": workflowID,
"RunID": runID,
"QueryType": queryType,
}).Error("Cannot get WorkflowType for QueryTask.")
}

// LogQueryTaskFailedEvent is used to log query task failure
func LogQueryTaskFailedEvent(logger bark.Logger, domain, workflowID, runID, queryType, errMsg string) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: QueryTaskFailedEventID,
"Domain": domain,
"WorkflowID": workflowID,
"RunID": runID,
"QueryType": queryType,
}).Info(errMsg)
}
39 changes: 27 additions & 12 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ const (
FrontendRecordActivityTaskHeartbeatScope
// FrontendRespondDecisionTaskCompletedScope is the metric scope for frontend.RespondDecisionTaskCompleted
FrontendRespondDecisionTaskCompletedScope
// FrontendRespondQueryTaskCompletedScope is the metric scope for frontend.RespondQueryTaskCompleted
FrontendRespondQueryTaskCompletedScope
// FrontendRespondActivityTaskCompletedScope is the metric scope for frontend.RespondActivityTaskCompleted
FrontendRespondActivityTaskCompletedScope
// FrontendRespondActivityTaskFailedScope is the metric scope for frontend.RespondActivityTaskFailed
Expand All @@ -243,6 +245,8 @@ const (
FrontendUpdateDomainScope
// FrontendDeprecateDomainScope is the metric scope for frontend.DeprecateDomain
FrontendDeprecateDomainScope
// FrontendQueryWorkflowScope is the metric scope for frontend.QueryWorkflow
FrontendQueryWorkflowScope

NumFrontendScopes
)
Expand Down Expand Up @@ -319,6 +323,10 @@ const (
MatchingAddDecisionTaskScope
// MatchingTaskListMgrScope is the metrics scope for matching.TaskListManager component
MatchingTaskListMgrScope
// MatchingQueryWorkflowScope tracks AddDecisionTask API calls received by service
MatchingQueryWorkflowScope
// MatchingRespondQueryTaskCompletedScope tracks AddDecisionTask API calls received by service
MatchingRespondQueryTaskCompletedScope

NumMatchingScopes
)
Expand Down Expand Up @@ -381,6 +389,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FrontendPollForActivityTaskScope: {operation: "PollForActivityTask"},
FrontendRecordActivityTaskHeartbeatScope: {operation: "RecordActivityTaskHeartbeat"},
FrontendRespondDecisionTaskCompletedScope: {operation: "RespondDecisionTaskCompleted"},
FrontendRespondQueryTaskCompletedScope: {operation: "RespondQueryTaskCompleted"},
FrontendRespondActivityTaskCompletedScope: {operation: "RespondActivityTaskCompleted"},
FrontendRespondActivityTaskFailedScope: {operation: "RespondActivityTaskFailed"},
FrontendRespondActivityTaskCanceledScope: {operation: "RespondActivityTaskCanceled"},
Expand All @@ -394,6 +403,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FrontendDescribeDomainScope: {operation: "DescribeDomain"},
FrontendUpdateDomainScope: {operation: "UpdateDomain"},
FrontendDeprecateDomainScope: {operation: "DeprecateDomain"},
FrontendQueryWorkflowScope: {operation: "QueryWorkflow"},
},
// History Scope Names
History: {
Expand Down Expand Up @@ -427,11 +437,13 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
},
// Matching Scope Names
Matching: {
MatchingPollForDecisionTaskScope: {operation: "PollForDecisionTask"},
MatchingPollForActivityTaskScope: {operation: "PollForActivityTask"},
MatchingAddActivityTaskScope: {operation: "AddActivityTask"},
MatchingAddDecisionTaskScope: {operation: "AddDecisionTask"},
MatchingTaskListMgrScope: {operation: "TaskListMgr"},
MatchingPollForDecisionTaskScope: {operation: "PollForDecisionTask"},
MatchingPollForActivityTaskScope: {operation: "PollForActivityTask"},
MatchingAddActivityTaskScope: {operation: "AddActivityTask"},
MatchingAddDecisionTaskScope: {operation: "AddDecisionTask"},
MatchingTaskListMgrScope: {operation: "TaskListMgr"},
MatchingQueryWorkflowScope: {operation: "QueryWorkflow"},
MatchingRespondQueryTaskCompletedScope: {operation: "RespondQueryTaskCompleted"},
},
}

Expand All @@ -446,6 +458,7 @@ const (
CadenceErrExecutionAlreadyStartedCounter
CadenceErrDomainAlreadyExistsCounter
CadenceErrCancellationAlreadyRequestedCounter
CadenceErrQueryFailedCounter
PersistenceRequests
PersistenceFailures
PersistenceLatency
Expand Down Expand Up @@ -509,6 +522,7 @@ const (
LeaseRequestCounter
LeaseFailureCounter
ConditionFailedErrorCounter
RespondQueryTaskFailedCounter
)

// MetricDefs record the metrics for all services
Expand Down Expand Up @@ -574,13 +588,14 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
RemoveEngineForShardLatency: {metricName: "remove-engine-for-shard-latency", metricType: Timer},
},
Matching: {
PollSuccessCounter: {metricName: "poll.success"},
PollTimeoutCounter: {metricName: "poll.timeouts"},
PollErrorsCounter: {metricName: "poll.errors"},
PollSuccessWithSyncCounter: {metricName: "poll.success.sync"},
LeaseRequestCounter: {metricName: "lease.requests"},
LeaseFailureCounter: {metricName: "lease.failures"},
ConditionFailedErrorCounter: {metricName: "condition-failed-errors"},
PollSuccessCounter: {metricName: "poll.success"},
PollTimeoutCounter: {metricName: "poll.timeouts"},
PollErrorsCounter: {metricName: "poll.errors"},
PollSuccessWithSyncCounter: {metricName: "poll.success.sync"},
LeaseRequestCounter: {metricName: "lease.requests"},
LeaseFailureCounter: {metricName: "lease.failures"},
ConditionFailedErrorCounter: {metricName: "condition-failed-errors"},
RespondQueryTaskFailedCounter: {metricName: "respond-query-failed"},
},
}

Expand Down
9 changes: 9 additions & 0 deletions common/taskTokenSerializerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type (
TaskTokenSerializer interface {
Serialize(token *TaskToken) ([]byte, error)
Deserialize(data []byte) (*TaskToken, error)
SerializeQueryTaskToken(token *QueryTaskToken) ([]byte, error)
DeserializeQueryTaskToken(data []byte) (*QueryTaskToken, error)
}

// TaskToken identifies a task
Expand All @@ -34,4 +36,11 @@ type (
RunID string `json:"runId"`
ScheduleID int64 `json:"scheduleId"`
}

// QueryTaskToken identifies a query task
QueryTaskToken struct {
DomainID string `json:"domainId"`
TaskList string `jaon:"taskList"`
TaskID string `jaon:"taskId"`
}
)
Loading

0 comments on commit 8f5a16f

Please sign in to comment.