Skip to content

Commit

Permalink
Consistent query bug fixes (cadence-workflow#2908)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Dec 13, 2019
1 parent 1e63e93 commit 39fabad
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 13 deletions.
6 changes: 3 additions & 3 deletions common/client/versionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ const (
CLI = "cli"

// SupportedGoSDKVersion indicates the highest go sdk version server will accept requests from
SupportedGoSDKVersion = "1.4.0"
SupportedGoSDKVersion = "1.5.0"
// SupportedJavaSDKVersion indicates the highest java sdk version server will accept requests from
SupportedJavaSDKVersion = "1.4.0"
SupportedJavaSDKVersion = "1.5.0"
// SupportedCLIVersion indicates the highest cli version server will accept requests from
SupportedCLIVersion = "1.4.0"
SupportedCLIVersion = "1.5.0"

// GoWorkerStickyQueryVersion indicates the minimum client version of go worker which supports StickyQuery
GoWorkerStickyQueryVersion = "1.0.0"
Expand Down
5 changes: 4 additions & 1 deletion config/dynamicconfig/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ frontend.enableClientVersionCheck:
constraints: {}
system.minRetentionDays:
- value: 0
constraints: {}
constraints: {}
history.EnableConsistentQueryByDomain:
- value: true
constrains: {}
2 changes: 1 addition & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3116,7 +3116,7 @@ func (wh *WorkflowHandler) getDefaultScope(scope int) metrics.Scope {
return wh.GetMetricsClient().Scope(scope).Tagged(metrics.DomainUnknownTag())
}

func frontendInternalServiceError(fmtStr string, args... interface{}) error{
func frontendInternalServiceError(fmtStr string, args ...interface{}) error {
// NOTE: For internal error, we can't return thrift error from cadence-frontend.
// Because in uber internal metrics, thrift errors are counted as user errors.
return fmt.Errorf(fmtStr, args...)
Expand Down
18 changes: 16 additions & 2 deletions service/history/decisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,14 @@ Update_History_Loop:
return nil, updateErr
}

handler.handleBufferedQueries(msBuilder, clientImpl, clientFeatureVersion, req.GetCompleteRequest().GetQueryResults(), createNewDecisionTask, domainEntry)
handler.handleBufferedQueries(
msBuilder,
clientImpl,
clientFeatureVersion,
req.GetCompleteRequest().GetQueryResults(),
createNewDecisionTask,
domainEntry,
decisionHeartbeating)

if decisionHeartbeatTimeout {
// at this point, update is successful, but we still return an error to client so that the worker will give up this workflow
Expand Down Expand Up @@ -639,6 +646,7 @@ func (handler *decisionHandlerImpl) handleBufferedQueries(
queryResults map[string]*workflow.WorkflowQueryResult,
createNewDecisionTask bool,
domainEntry *cache.DomainCacheEntry,
decisionHeartbeating bool,
) {
queryRegistry := msBuilder.GetQueryRegistry()
if !queryRegistry.hasBufferedQuery() {
Expand All @@ -658,7 +666,7 @@ func (handler *decisionHandlerImpl) handleBufferedQueries(
scope.IncCounter(metrics.WorkerNotSupportsConsistentQueryCount)
failedTerminationState := &queryTerminationState{
queryTerminationType: queryTerminationTypeFailed,
failure: versionErr,
failure: &workflow.BadRequestError{Message: versionErr.Error()},
}
buffered := queryRegistry.getBufferedIDs()
handler.logger.Info(
Expand All @@ -682,6 +690,12 @@ func (handler *decisionHandlerImpl) handleBufferedQueries(
return
}

// if its a heartbeat decision it means local activities may still be running on the worker
// which were started by an external event which happened before the query
if decisionHeartbeating {
return
}

sizeLimitError := handler.config.BlobSizeLimitError(domain)
sizeLimitWarn := handler.config.BlobSizeLimitWarn(domain)

Expand Down
15 changes: 11 additions & 4 deletions service/history/decisionHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,28 @@ func (s *DecisionHandlerSuite) TearDownTest() {

func (s *DecisionHandlerSuite) TestHandleBufferedQueries_ClientNotSupports() {
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, "0.0.0", nil, false, testGlobalDomainEntry)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, "0.0.0", nil, false, testGlobalDomainEntry, false)
s.assertQueryCounts(s.queryRegistry, 0, 0, 0, 10)
}

func (s *DecisionHandlerSuite) TestHandleBufferedQueries_HeartbeatDecision() {
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
queryResults := s.constructQueryResults(s.queryRegistry.getBufferedIDs()[0:5], 10)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, client.GoWorkerConsistentQueryVersion, queryResults, false, testGlobalDomainEntry, true)
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
}

func (s *DecisionHandlerSuite) TestHandleBufferedQueries_NewDecisionTask() {
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
queryResults := s.constructQueryResults(s.queryRegistry.getBufferedIDs()[0:5], 10)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, client.GoWorkerConsistentQueryVersion, queryResults, true, testGlobalDomainEntry)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, client.GoWorkerConsistentQueryVersion, queryResults, true, testGlobalDomainEntry, false)
s.assertQueryCounts(s.queryRegistry, 5, 5, 0, 0)
}

func (s *DecisionHandlerSuite) TestHandleBufferedQueries_NoNewDecisionTask() {
s.assertQueryCounts(s.queryRegistry, 10, 0, 0, 0)
queryResults := s.constructQueryResults(s.queryRegistry.getBufferedIDs()[0:5], 10)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, client.GoWorkerConsistentQueryVersion, queryResults, false, testGlobalDomainEntry)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, client.GoWorkerConsistentQueryVersion, queryResults, false, testGlobalDomainEntry, false)
s.assertQueryCounts(s.queryRegistry, 0, 5, 5, 0)
}

Expand All @@ -107,7 +114,7 @@ func (s *DecisionHandlerSuite) TestHandleBufferedQueries_QueryTooLarge() {
for k, v := range largeQueryResults {
queryResults[k] = v
}
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, client.GoWorkerConsistentQueryVersion, queryResults, false, testGlobalDomainEntry)
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, client.GoWorkerConsistentQueryVersion, queryResults, false, testGlobalDomainEntry, false)
s.assertQueryCounts(s.queryRegistry, 0, 5, 0, 5)
}

Expand Down
3 changes: 1 addition & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ var (
// ErrEventsAterWorkflowFinish is the error indicating server error trying to write events after workflow finish event
ErrEventsAterWorkflowFinish = &workflow.InternalServiceError{Message: "error validating last event being workflow finish event"}
// ErrQueryEnteredInvalidState is error indicating query entered invalid state
ErrQueryEnteredInvalidState = &workflow.InternalServiceError{Message: "query entered invalid state, this should be impossible"}
ErrQueryEnteredInvalidState = &workflow.BadRequestError{Message: "query entered invalid state, this should be impossible"}
// ErrQueryWorkflowBeforeFirstDecision is error indicating that query was attempted before first decision task completed
ErrQueryWorkflowBeforeFirstDecision = &workflow.BadRequestError{Message: "workflow must handle at least one decision task before it can be queried"}
// ErrConsistentQueryNotEnabled is error indicating that consistent query was requested but either cluster or domain does not enable consistent query
Expand Down Expand Up @@ -1002,7 +1002,6 @@ func (e *historyEngineImpl) queryDirectlyThroughMatching(
func (e *historyEngineImpl) getMutableState(
ctx ctx.Context,
domainID string,

execution workflow.WorkflowExecution,
) (retResp *h.GetMutableStateResponse, retError error) {

Expand Down

0 comments on commit 39fabad

Please sign in to comment.