Skip to content

Commit

Permalink
Return when current branch token changed in poll mutable state API (c…
Browse files Browse the repository at this point in the history
…adence-workflow#2396)

* split GetMutableState API to two sync and long poll
  • Loading branch information
yux0 authored Aug 19, 2019
1 parent da2c5bf commit f5fbb20
Show file tree
Hide file tree
Showing 15 changed files with 401 additions and 146 deletions.
198 changes: 141 additions & 57 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions .gen/go/history/historyserviceclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions .gen/go/history/historyserviceserver/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions .gen/go/history/historyservicetest/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions idl/github.com/uber/cadence/history.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct GetMutableStateRequest {
10: optional string domainUUID
20: optional shared.WorkflowExecution execution
30: optional i64 (js.type = "Long") expectedNextEventId
40: optional binary currentBranchToken
}

struct GetMutableStateResponse {
Expand All @@ -82,7 +83,7 @@ struct GetMutableStateResponse {
100: optional bool isWorkflowRunning
110: optional i32 stickyTaskListScheduleToStartTimeout
120: optional i32 eventStoreVersion
130: optional binary branchToken
130: optional binary currentBranchToken
140: optional map<string, shared.ReplicationInfo> replicationInfo
150: optional shared.VersionHistories versionHistories
//TODO: change these fields to enum when possible
Expand All @@ -94,6 +95,7 @@ struct PollMutableStateRequest {
10: optional string domainUUID
20: optional shared.WorkflowExecution execution
30: optional i64 (js.type = "Long") expectedNextEventId
40: optional binary currentBranchToken
}

struct PollMutableStateResponse {
Expand All @@ -108,10 +110,10 @@ struct PollMutableStateResponse {
80: optional string clientFeatureVersion
90: optional string clientImpl
100: optional i32 stickyTaskListScheduleToStartTimeout
110: optional binary branchToken
110: optional binary currentBranchToken
120: optional map<string, shared.ReplicationInfo> replicationInfo
130: optional shared.VersionHistories versionHistories
//TODO: change these fields to enum when possible
//TODO: change these fields to enum when possible
140: optional i32 workflowState
150: optional i32 workflowCloseState
}
Expand Down Expand Up @@ -357,7 +359,7 @@ service HistoryService {
* It fails with 'EntityNotExistError' if specified workflow execution in unknown to the service.
* It returns CurrentBranchChangedError if the workflow version branch has changed.
**/
PollMutableStateResponse PollMutableState(1: PollMutableStateRequest getRequest)
PollMutableStateResponse PollMutableState(1: PollMutableStateRequest pollRequest)
throws (
1: shared.BadRequestError badRequestError,
2: shared.InternalServiceError internalServiceError,
Expand Down
23 changes: 13 additions & 10 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,8 @@ func (adh *AdminHandler) GetWorkflowExecutionRawHistory(
}

response, err := adh.history.GetMutableState(ctx, &h.GetMutableStateRequest{
DomainUUID: common.StringPtr(domainID),
Execution: execution,
ExpectedNextEventId: common.Int64Ptr(common.FirstEventID), // common.FirstEventID means no long poll
DomainUUID: common.StringPtr(domainID),
Execution: execution,
})
if err != nil {
return nil, err
Expand All @@ -310,13 +309,17 @@ func (adh *AdminHandler) GetWorkflowExecutionRawHistory(
nextEventID = response.GetNextEventId()
}
token = &getHistoryContinuationToken{
RunID: execution.GetRunId(),
BranchToken: response.BranchToken,
FirstEventID: firstEventID,
NextEventID: nextEventID,
PersistenceToken: nil, // this is the initialized value
EventStoreVersion: response.GetEventStoreVersion(),
ReplicationInfo: response.ReplicationInfo,
RunID: execution.GetRunId(),
BranchToken: response.CurrentBranchToken,
FirstEventID: firstEventID,
NextEventID: nextEventID,
PersistenceToken: nil, // this is the initialized value
ReplicationInfo: response.ReplicationInfo,
}
// calculate event store version based on if branch token exist
token.EventStoreVersion = persistence.EventStoreVersionV2
if token.BranchToken == nil {
token.EventStoreVersion = 0
}
}

Expand Down
39 changes: 29 additions & 10 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1686,18 +1686,37 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
// 3. the next event ID
// 4. whether the workflow is closed
// 5. error if any
queryHistory := func(domainUUID string, execution *gen.WorkflowExecution, expectedNextEventID int64) (int32, []byte, string, int64, int64, bool, error) {
response, err := wh.history.GetMutableState(ctx, &h.GetMutableStateRequest{
queryHistory := func(
domainUUID string,
execution *gen.WorkflowExecution,
expectedNextEventID int64,
currentBranchToken []byte,
) (int32, []byte, string, int64, int64, bool, error) {
response, err := wh.history.PollMutableState(ctx, &h.PollMutableStateRequest{
DomainUUID: common.StringPtr(domainUUID),
Execution: execution,
ExpectedNextEventId: common.Int64Ptr(expectedNextEventID),
CurrentBranchToken: currentBranchToken,
})

if err != nil {
return 0, nil, "", 0, 0, false, err
}
isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone

return response.GetEventStoreVersion(), response.BranchToken, response.Execution.GetRunId(), response.GetLastFirstEventId(), response.GetNextEventId(), response.GetIsWorkflowRunning(), nil
// calculate event store version based on if branch token exist
eventStoreVersion := persistence.EventStoreVersionV2
if len(response.GetCurrentBranchToken()) == 0 {
eventStoreVersion = 0
}

return int32(eventStoreVersion),
response.CurrentBranchToken,
response.Execution.GetRunId(),
response.GetLastFirstEventId(),
response.GetNextEventId(),
isWorkflowRunning,
nil
}

isLongPoll := getRequest.GetWaitForNewEvent()
Expand Down Expand Up @@ -1728,7 +1747,8 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
if !isCloseEventOnly {
queryNextEventID = token.NextEventID
}
token.EventStoreVersion, token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err = queryHistory(domainID, execution, queryNextEventID)
token.EventStoreVersion, token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
if err != nil {
return nil, wh.error(err, scope)
}
Expand All @@ -1740,7 +1760,8 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
if !isCloseEventOnly {
queryNextEventID = common.FirstEventID
}
token.EventStoreVersion, token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err = queryHistory(domainID, execution, queryNextEventID)
token.EventStoreVersion, token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(domainID, execution, queryNextEventID, nil)
if err != nil {
return nil, wh.error(err, scope)
}
Expand All @@ -1767,7 +1788,6 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
getRequest.GetMaximumPageSize(),
nil,
token.TransientDecision,
token.EventStoreVersion,
token.BranchToken,
)
if err != nil {
Expand Down Expand Up @@ -1800,7 +1820,6 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
getRequest.GetMaximumPageSize(),
token.PersistenceToken,
token.TransientDecision,
token.EventStoreVersion,
token.BranchToken,
)
if err != nil {
Expand Down Expand Up @@ -2882,13 +2901,12 @@ func (wh *WorkflowHandler) getHistory(
pageSize int32,
nextPageToken []byte,
transientDecision *gen.TransientDecisionInfo,
eventStoreVersion int32,
branchToken []byte,
) (*gen.History, []byte, error) {

historyEvents := []*gen.HistoryEvent{}
var size int
if eventStoreVersion == persistence.EventStoreVersionV2 {
if len(branchToken) != 0 {
shardID := common.WorkflowIDToHistoryShard(*execution.WorkflowId, wh.config.NumHistoryShards)
var err error
historyEvents, size, nextPageToken, err = persistence.ReadFullPageV2Events(wh.historyV2Mgr, &persistence.ReadHistoryBranchRequest{
Expand Down Expand Up @@ -3125,7 +3143,8 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
nextEventID,
int32(wh.config.HistoryMaxPageSize(domain.GetInfo().Name)),
nil,
matchingResp.DecisionInfo, eventStoreVersion, branchToken,
matchingResp.DecisionInfo,
branchToken,
)
if err != nil {
return nil, err
Expand Down
5 changes: 3 additions & 2 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,13 +1128,14 @@ func (s *workflowHandlerSuite) TestGetHistory() {
domainID := uuid.New()
firstEventID := int64(100)
nextEventID := int64(101)
branchToken := []byte{1}
we := gen.WorkflowExecution{
WorkflowId: common.StringPtr("wid"),
RunId: common.StringPtr("rid"),
}
shardID := common.WorkflowIDToHistoryShard(*we.WorkflowId, numHistoryShards)
req := &persistence.ReadHistoryBranchRequest{
BranchToken: []byte{},
BranchToken: branchToken,
MinEventID: firstEventID,
MaxEventID: nextEventID,
PageSize: 0,
Expand All @@ -1156,7 +1157,7 @@ func (s *workflowHandlerSuite) TestGetHistory() {
wh := s.getWorkflowHandlerWithParams(mService, config, mMetadataManager)
wh.metricsClient = wh.Service.GetMetricsClient()
scope := wh.metricsClient.Scope(0)
history, token, err := wh.getHistory(scope, domainID, we, firstEventID, nextEventID, 0, []byte{}, nil, persistence.EventStoreVersionV2, []byte{})
history, token, err := wh.getHistory(scope, domainID, we, firstEventID, nextEventID, 0, []byte{}, nil, branchToken)
s.NotNil(history)
s.Equal([]byte{}, token)
s.NoError(err)
Expand Down
Loading

0 comments on commit f5fbb20

Please sign in to comment.