Skip to content

Commit

Permalink
Query API (cadence-workflow#2482)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Aug 29, 2019
1 parent 8da40ae commit 24147da
Show file tree
Hide file tree
Showing 19 changed files with 6,112 additions and 4,534 deletions.
10,221 changes: 5,701 additions & 4,520 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions .gen/go/history/historyserviceclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion .gen/go/history/historyserviceserver/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions .gen/go/history/historyservicetest/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,31 @@ func (c *clientImpl) SyncActivity(
return err
}

func (c *clientImpl) QueryWorkflow(
ctx context.Context,
request *h.QueryWorkflowRequest,
opts ...yarpc.CallOption,
) (*h.QueryWorkflowResponse, error) {
client, err := c.getClientForWorkflowID(request.Execution.GetWorkflowId())
if err != nil {
return nil, err
}
opts = common.AggregateYarpcOptions(ctx, opts...)
var response *h.QueryWorkflowResponse
op := func(ctx context.Context, client historyserviceclient.Interface) error {
var err error
ctx, cancel := c.createContext(ctx)
defer cancel()
response, err = client.QueryWorkflow(ctx, request, opts...)
return err
}
err = c.executeWithRedirect(ctx, client, op)
if err != nil {
return nil, err
}
return response, nil
}

func (c *clientImpl) GetReplicationMessages(
ctx context.Context,
request *replicator.GetReplicationMessagesRequest,
Expand Down
18 changes: 18 additions & 0 deletions client/history/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,3 +506,21 @@ func (c *metricClient) GetReplicationMessages(

return resp, err
}

func (c *metricClient) QueryWorkflow(
ctx context.Context,
request *h.QueryWorkflowRequest,
opts ...yarpc.CallOption,
) (*h.QueryWorkflowResponse, error) {
c.metricsClient.IncCounter(metrics.HistoryClientQueryWorkflowScope, metrics.CadenceClientRequests)

sw := c.metricsClient.StartTimer(metrics.HistoryClientQueryWorkflowScope, metrics.CadenceClientLatency)
resp, err := c.client.QueryWorkflow(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientQueryWorkflowScope, metrics.CadenceClientFailures)
}

return resp, err
}
16 changes: 16 additions & 0 deletions client/history/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,19 @@ func (c *retryableClient) GetReplicationMessages(
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) QueryWorkflow(
ctx context.Context,
request *h.QueryWorkflowRequest,
opts ...yarpc.CallOption,
) (*h.QueryWorkflowResponse, error) {
var resp *h.QueryWorkflowResponse
op := func() error {
var err error
resp, err = c.client.QueryWorkflow(ctx, request, opts...)
return err
}

err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}
6 changes: 6 additions & 0 deletions common/client/clientFeature.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (feature *FeatureImpl) SupportStickyQuery() bool {
return feature.featureVersion.major > 0
}

// SupportConsistentQuery whether a client supports consistent query
func (feature *FeatureImpl) SupportConsistentQuery() bool {
// TODO: andrewjdawson2016 once client side changes for consistent query are done then update this
return false
}

func parseVersion(versionStr string) version {
var major int64
var minor int64
Expand Down
8 changes: 8 additions & 0 deletions common/client/clientFeature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,11 @@ func (s *FeatureSuite) TestSupportStickyQuery() {
feature = NewFeatureImpl(libVersion, featureVersion, lang)
s.False(feature.SupportStickyQuery(), "Should not support sticky query")
}

func (s *FeatureSuite) TestSupportConsistentQuery() {
libVersion := "0.5.0"
featureVersion := "1.0.0"
lang := "go"
feature := NewFeatureImpl(libVersion, featureVersion, lang)
s.False(feature.SupportConsistentQuery())
}
6 changes: 6 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ const (
HistoryClientSyncActivityScope
// HistoryClientGetReplicationTasksScope tracks RPC calls to history service
HistoryClientGetReplicationTasksScope
// HistoryClientQueryWorkflowScope tracks RPC calls to history service
HistoryClientQueryWorkflowScope
// MatchingClientPollForDecisionTaskScope tracks RPC calls to matching service
MatchingClientPollForDecisionTaskScope
// MatchingClientPollForActivityTaskScope tracks RPC calls to matching service
Expand Down Expand Up @@ -808,6 +810,8 @@ const (
SessionCountStatsScope
// HistoryResetWorkflowExecutionScope tracks ResetWorkflowExecution API calls received by service
HistoryResetWorkflowExecutionScope
// HistoryQueryWorkflowScope tracks QueryWorkflow API calls received by service
HistoryQueryWorkflowScope
// HistoryProcessDeleteHistoryEventScope tracks ProcessDeleteHistoryEvent processing calls
HistoryProcessDeleteHistoryEventScope
// WorkflowCompletionStatsScope tracks workflow completion updates
Expand Down Expand Up @@ -972,6 +976,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryClientSyncShardStatusScope: {operation: "HistoryClientSyncShardStatusScope", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
HistoryClientSyncActivityScope: {operation: "HistoryClientSyncActivityScope", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
HistoryClientGetReplicationTasksScope: {operation: "HistoryClientGetReplicationTasksScope", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
HistoryClientQueryWorkflowScope: {operation: "HistoryClientQueryWorkflowScope", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
MatchingClientPollForDecisionTaskScope: {operation: "MatchingClientPollForDecisionTask", tags: map[string]string{CadenceRoleTagName: MatchingRoleTagValue}},
MatchingClientPollForActivityTaskScope: {operation: "MatchingClientPollForActivityTask", tags: map[string]string{CadenceRoleTagName: MatchingRoleTagValue}},
MatchingClientAddActivityTaskScope: {operation: "MatchingClientAddActivityTask", tags: map[string]string{CadenceRoleTagName: MatchingRoleTagValue}},
Expand Down Expand Up @@ -1158,6 +1163,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryRemoveSignalMutableStateScope: {operation: "RemoveSignalMutableState"},
HistoryTerminateWorkflowExecutionScope: {operation: "TerminateWorkflowExecution"},
HistoryResetWorkflowExecutionScope: {operation: "ResetWorkflowExecution"},
HistoryQueryWorkflowScope: {operation: "QueryWorkflow"},
HistoryProcessDeleteHistoryEventScope: {operation: "ProcessDeleteHistoryEvent"},
HistoryScheduleDecisionTaskScope: {operation: "ScheduleDecisionTask"},
HistoryRecordChildExecutionCompletedScope: {operation: "RecordChildExecutionCompleted"},
Expand Down
24 changes: 24 additions & 0 deletions idl/github.com/uber/cadence/history.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,16 @@ struct SyncActivityRequest {
130: optional string lastWorkerIdentity
}

struct QueryWorkflowRequest {
10: optional string domainUUID
20: optional shared.WorkflowExecution execution
30: optional shared.WorkflowQuery query
}

struct QueryWorkflowResponse {
10: optional binary queryResult
}

/**
* HistoryService provides API to start a new long running workflow instance, as well as query and update the history
* of workflow instances already created.
Expand Down Expand Up @@ -724,4 +734,18 @@ service HistoryService {
4: shared.ServiceBusyError serviceBusyError,
5: shared.ClientVersionNotSupportedError clientVersionNotSupportedError,
)

/**
* QueryWorkflow returns query result for a specified workflow execution
**/
QueryWorkflowResponse QueryWorkflow(1: QueryWorkflowRequest queryRequest)
throws (
1: shared.BadRequestError badRequestError,
2: shared.InternalServiceError internalServiceError,
3: shared.EntityNotExistsError entityNotExistError,
4: shared.QueryFailedError queryFailedError,
5: shared.LimitExceededError limitExceededError,
6: shared.ServiceBusyError serviceBusyError,
7: shared.ClientVersionNotSupportedError clientVersionNotSupportedError,
)
}
48 changes: 36 additions & 12 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2721,11 +2721,6 @@ func (wh *WorkflowHandler) QueryWorkflow(
return nil, wh.error(err, scope)
}

matchingRequest := &m.QueryWorkflowRequest{
DomainUUID: common.StringPtr(domainID),
QueryRequest: queryRequest,
}

// we should always use the mutable state, since it contains the sticky tasklist information
response, err := wh.history.GetMutableState(ctx, &h.GetMutableStateRequest{
DomainUUID: common.StringPtr(domainID),
Expand Down Expand Up @@ -2758,11 +2753,40 @@ func (wh *WorkflowHandler) QueryWorkflow(
response.GetClientFeatureVersion(),
response.GetClientImpl(),
)

queryRequest.Execution.RunId = response.Execution.RunId
if len(response.StickyTaskList.GetName()) != 0 && clientFeature.SupportStickyQuery() {
matchingRequest.TaskList = response.StickyTaskList
stickyDecisionTimeout := response.GetStickyTaskListScheduleToStartTimeout()
if clientFeature.SupportConsistentQuery() {
req := &h.QueryWorkflowRequest{
DomainUUID: common.StringPtr(domainID),
Execution: queryRequest.GetExecution(),
Query: queryRequest.GetQuery(),
}
resp, err := wh.history.QueryWorkflow(ctx, req)
if err != nil {
return nil, err
}
return &gen.QueryWorkflowResponse{
QueryResult: resp.GetQueryResult(),
QueryRejected: nil,
}, nil
}
return wh.queryDirectlyThroughMatching(ctx, response, clientFeature, domainID, queryRequest, scope)
}

func (wh *WorkflowHandler) queryDirectlyThroughMatching(
ctx context.Context,
getMutableStateResponse *h.GetMutableStateResponse,
clientFeature client.Feature,
domainID string,
queryRequest *gen.QueryWorkflowRequest,
scope metrics.Scope,
) (*gen.QueryWorkflowResponse, error) {
matchingRequest := &m.QueryWorkflowRequest{
DomainUUID: common.StringPtr(domainID),
QueryRequest: queryRequest,
}
if len(getMutableStateResponse.StickyTaskList.GetName()) != 0 && clientFeature.SupportStickyQuery() {
matchingRequest.TaskList = getMutableStateResponse.StickyTaskList
stickyDecisionTimeout := getMutableStateResponse.GetStickyTaskListScheduleToStartTimeout()
// using a clean new context in case customer provide a context which has
// a really short deadline, causing we clear the stickyness
stickyContext, cancel := context.WithTimeout(context.Background(), time.Duration(stickyDecisionTimeout)*time.Second)
Expand All @@ -2786,8 +2810,8 @@ func (wh *WorkflowHandler) QueryWorkflow(
tag.WorkflowID(queryRequest.Execution.GetWorkflowId()),
tag.WorkflowRunID(queryRequest.Execution.GetRunId()),
tag.WorkflowQueryType(queryRequest.Query.GetQueryType()),
tag.WorkflowTaskListName(response.GetStickyTaskList().GetName()),
tag.WorkflowNextEventID(response.GetNextEventId()))
tag.WorkflowTaskListName(getMutableStateResponse.GetStickyTaskList().GetName()),
tag.WorkflowNextEventID(getMutableStateResponse.GetNextEventId()))
resetContext, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = wh.history.ResetStickyTaskList(resetContext, &h.ResetStickyTaskListRequest{
DomainUUID: common.StringPtr(domainID),
Expand All @@ -2799,7 +2823,7 @@ func (wh *WorkflowHandler) QueryWorkflow(
}
}

matchingRequest.TaskList = response.TaskList
matchingRequest.TaskList = getMutableStateResponse.TaskList
matchingResp, err := wh.matching.QueryWorkflow(ctx, matchingRequest)
if err != nil {
wh.Service.GetLogger().Info("QueryWorkflowFailed.",
Expand Down
Loading

0 comments on commit 24147da

Please sign in to comment.