Skip to content

Commit

Permalink
GetWorkflowExecutionHistory returns invalid nextPageToken (cadence-wo…
Browse files Browse the repository at this point in the history
…rkflow#283)

Make token fields public so they can be serialized properly.
Also change integration test page size to force pagination path to be exercised.
Issue cadence-workflow#282
  • Loading branch information
Tamer Eldeeb authored Jul 20, 2017
1 parent b5c2556 commit a5a7c51
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
2 changes: 1 addition & 1 deletion host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2557,7 +2557,7 @@ func (s *integrationSuite) printWorkflowHistory(domain string, execution *workfl
historyResponse, err := s.engine.GetWorkflowExecutionHistory(&workflow.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domain),
Execution: execution,
MaximumPageSize: common.Int32Ptr(10),
MaximumPageSize: common.Int32Ptr(5), // Use small page size to force pagination code path
})
s.Nil(err)

Expand Down
48 changes: 26 additions & 22 deletions service/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ type (
}

getHistoryContinuationToken struct {
runID string
nextEventID int64
persistenceToken []byte
RunID string
NextEventID int64
PersistenceToken []byte
}
)

Expand All @@ -74,14 +74,15 @@ const (
)

var (
errDomainNotSet = &gen.BadRequestError{Message: "Domain not set on request."}
errTaskTokenNotSet = &gen.BadRequestError{Message: "Task token not set on request."}
errTaskListNotSet = &gen.BadRequestError{Message: "TaskList is not set on request."}
errExecutionNotSet = &gen.BadRequestError{Message: "Execution is not set on request."}
errWorkflowIDNotSet = &gen.BadRequestError{Message: "WorkflowId is not set on request."}
errRunIDNotSet = &gen.BadRequestError{Message: "RunId is not set on request."}
errInvalidRunID = &gen.BadRequestError{Message: "Invalid RunId."}
errInvalidNextPageToken = &gen.BadRequestError{Message: "Invalid NextPageToken."}
errDomainNotSet = &gen.BadRequestError{Message: "Domain not set on request."}
errTaskTokenNotSet = &gen.BadRequestError{Message: "Task token not set on request."}
errTaskListNotSet = &gen.BadRequestError{Message: "TaskList is not set on request."}
errExecutionNotSet = &gen.BadRequestError{Message: "Execution is not set on request."}
errWorkflowIDNotSet = &gen.BadRequestError{Message: "WorkflowId is not set on request."}
errRunIDNotSet = &gen.BadRequestError{Message: "RunId is not set on request."}
errInvalidRunID = &gen.BadRequestError{Message: "Invalid RunId."}
errInvalidNextPageToken = &gen.BadRequestError{Message: "Invalid NextPageToken."}
errNextPageTokenRunIDMismatch = &gen.BadRequestError{Message: "RunID in the request does not match the NextPageToken."}
)

// NewWorkflowHandler creates a thrift handler for the cadence service
Expand Down Expand Up @@ -645,14 +646,17 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
if err != nil {
return nil, wh.error(errInvalidNextPageToken, scope)
}
if getRequest.GetExecution().IsSetRunId() && getRequest.GetExecution().GetRunId() != token.RunID {
return nil, wh.error(errNextPageTokenRunIDMismatch, scope)
}
} else {
response, err := wh.history.GetWorkflowExecutionNextEventID(ctx, &h.GetWorkflowExecutionNextEventIDRequest{
DomainUUID: common.StringPtr(info.ID),
Execution: getRequest.GetExecution(),
})
if err == nil {
token.nextEventID = response.GetEventId()
token.runID = response.GetRunId()
token.NextEventID = response.GetEventId()
token.RunID = response.GetRunId()
} else {
if _, ok := err.(*gen.EntityNotExistsError); !ok || !getRequest.GetExecution().IsSetRunId() {
return nil, wh.error(err, scope)
Expand All @@ -666,27 +670,27 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
if err != nil {
return nil, wh.error(err, scope)
}
token.nextEventID = visibilityResp.Execution.GetHistoryLength()
token.runID = visibilityResp.Execution.GetExecution().GetRunId()
token.NextEventID = visibilityResp.Execution.GetHistoryLength()
token.RunID = visibilityResp.Execution.GetExecution().GetRunId()
}
}

we := gen.WorkflowExecution{
WorkflowId: getRequest.GetExecution().WorkflowId,
RunId: common.StringPtr(token.runID),
RunId: common.StringPtr(token.RunID),
}
history, persistenceToken, err :=
wh.getHistory(info.ID, we, token.nextEventID, getRequest.GetMaximumPageSize(), getRequest.GetNextPageToken())
wh.getHistory(info.ID, we, token.NextEventID, getRequest.GetMaximumPageSize(), token.PersistenceToken)
if err != nil {
return nil, wh.error(err, scope)
}

nextToken, err := getSerializedGetHistoryToken(persistenceToken, token.runID, history, token.nextEventID)
nextToken, err := getSerializedGetHistoryToken(persistenceToken, token.RunID, history, token.NextEventID)
if err != nil {
return nil, wh.error(err, scope)
}

return createGetWorkflowExecutionHistoryResponse(history, token.nextEventID, nextToken), nil
return createGetWorkflowExecutionHistoryResponse(history, token.NextEventID, nextToken), nil
}

// SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in
Expand Down Expand Up @@ -1150,9 +1154,9 @@ func getSerializedGetHistoryToken(persistenceToken []byte, runID string, history
events := history.GetEvents()
if len(persistenceToken) > 0 && len(events) > 0 && events[len(events)-1].GetEventId() < nextEventID-1 {
token := &getHistoryContinuationToken{
runID: runID,
nextEventID: nextEventID,
persistenceToken: persistenceToken,
RunID: runID,
NextEventID: nextEventID,
PersistenceToken: persistenceToken,
}
data, err := json.Marshal(token)

Expand Down

0 comments on commit a5a7c51

Please sign in to comment.