Skip to content

Commit

Permalink
Returning empty response for last empty history events read request (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Apr 13, 2019
1 parent f6bd09d commit d5d74d9
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 18 deletions.
12 changes: 0 additions & 12 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *p.Int
}
}

found := false
nextPageToken := iter.PageState()

//NOTE: in this method, we need to make sure is NOT decreasing(otherwise we skip the events)
Expand All @@ -177,8 +176,6 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *p.Int
history := make([]*p.DataBlob, 0, request.PageSize)

for iter.Scan(nil, &eventBatchVersionPointer, &eventBatch.Data, &eventBatch.Encoding) {
found = true

if eventBatchVersionPointer != nil {
eventBatchVersion = *eventBatchVersionPointer
}
Expand All @@ -198,15 +195,6 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *p.Int
}
}

if !found && len(request.NextPageToken) == 0 {
// adding the check of request next token being not nil, since
// there can be case when found == false at the very end of pagination.
return nil, &workflow.EntityNotExistsError{
Message: fmt.Sprintf("Workflow execution history not found. WorkflowId: %v, RunId: %v",
*execution.WorkflowId, *execution.RunId),
}
}

response := &p.InternalGetWorkflowExecutionHistoryResponse{
NextPageToken: nextPageToken,
History: history,
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/historyStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ func (m *historyManagerImpl) getWorkflowExecutionHistory(request *GetWorkflowExe
if err != nil {
return nil, nil, nil, 0, 0, err
}
if len(response.History) == 0 && len(request.NextPageToken) == 0 {
return nil, nil, nil, 0, 0, &workflow.EntityNotExistsError{
Message: fmt.Sprintf("Workflow execution history not found. WorkflowId: %v, RunId: %v",
request.Execution.GetWorkflowId(), request.Execution.GetRunId()),
}
}

// we store LastEventBatchVersion in the token. The reason we do it here is for historic reason.
token.LastEventBatchVersion = response.LastEventBatchVersion
token.Data = response.NextPageToken
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/historyV2Store.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ func (m *historyV2ManagerImpl) readHistoryBranch(byBatch bool, request *ReadHist
if err != nil {
return nil, nil, nil, 0, 0, err
}
if len(resp.History) == 0 && len(request.NextPageToken) == 0 {
return nil, nil, nil, 0, 0, &workflow.EntityNotExistsError{Message: "Workflow execution history not found."}
}

events := make([]*workflow.HistoryEvent, 0, request.PageSize)
historyBatches := make([]*workflow.History, 0, request.PageSize)
Expand Down
28 changes: 27 additions & 1 deletion common/persistence/persistence-tests/historyPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,37 @@ func (s *HistoryPersistenceSuite) TestGetHistoryEvents() {
err0 := s.AppendHistoryEvents(domainID, workflowExecution, 1, common.EmptyVersion, 1, 1, batchEvents, false)
s.Nil(err0)

history, token, err1 := s.GetWorkflowExecutionHistory(domainID, workflowExecution, 1, 2, 10, nil)
// Here the nextEventID is set to 4 to make sure that if NextPageToken is set by persistence, we can get it here.
// Otherwise the middle layer will clear it. In this way, we can test that if the # of rows got from DB is less than
// page size, NextPageToken is empty.
history, token, err1 := s.GetWorkflowExecutionHistory(domainID, workflowExecution, 1, 4, 10, nil)
s.Nil(err1)
s.Equal(0, len(token))
s.Equal(2, len(history.Events))
s.Equal(int64(1), history.Events[0].GetVersion())

// We have only one page and the page size is set to one. In this case, persistence may or may not return a nextPageToken.
// If it does return a token, we need to ensure that if the token returned is used to get history again, no error and history
// events should be returned.
history, token, err1 = s.GetWorkflowExecutionHistory(domainID, workflowExecution, 1, 4, 1, nil)
s.Nil(err1)
s.Equal(2, len(history.Events))
if len(token) != 0 {
history, token, err1 = s.GetWorkflowExecutionHistory(domainID, workflowExecution, 1, 4, 1, token)
s.Nil(err1)
s.Equal(0, len(token))
s.Equal(0, len(history.Events))
}

// firstEventID is 2, since there's only one page and nextPageToken is empty,
// the call should return an error.
_, _, err2 := s.GetWorkflowExecutionHistory(domainID, workflowExecution, 2, 4, 1, nil)
s.IsType(&gen.EntityNotExistsError{}, err2)

// Get history of a workflow that doesn't exist.
workflowExecution.WorkflowId = common.StringPtr("some-random-id")
_, _, err2 = s.GetWorkflowExecutionHistory(domainID, workflowExecution, 1, 2, 1, nil)
s.IsType(&gen.EntityNotExistsError{}, err2)
}

func newBatchEventForTest(eventIDs []int64, version int64) *gen.History {
Expand Down
18 changes: 18 additions & 0 deletions common/persistence/persistence-tests/historyV2PersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
gen "github.com/uber/cadence/.gen/go/shared"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
Expand Down Expand Up @@ -260,14 +261,31 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() {
req.NextPageToken = resp.NextPageToken

// last page: one batch of 18-20
// We have only one page left and the page size is set to one. In this case,
// persistence may or may not return a nextPageToken.
// If it does return a token, we need to ensure that if the token returned is used
// to get history again, no error and history events should be returned.
req.PageSize = 1
resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req)
s.Nil(err)
s.Equal(3, len(resp.HistoryEvents))
historyR.Events = append(historyR.Events, resp.HistoryEvents...)
req.NextPageToken = resp.NextPageToken
if len(resp.NextPageToken) != 0 {
resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req)
s.Nil(err)
s.Equal(0, len(resp.HistoryEvents))
}

s.True(historyW.Equals(historyR))
s.Equal(0, len(resp.NextPageToken))

// MinEventID is in the middle of the last batch and this is the first request (NextPageToken
// is empty), the call should return an error.
req.MinEventID = 19
req.NextPageToken = nil
resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req)
s.IsType(&gen.EntityNotExistsError{}, err)
}

//TestConcurrentlyCreateAndAppendBranches test
Expand Down
10 changes: 5 additions & 5 deletions common/persistence/sql/sqlHistoryManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ func (m *sqlHistoryManager) GetWorkflowExecutionHistory(request *p.InternalGetWo

// TODO: Ensure that no last empty page is requested
if err == sql.ErrNoRows || (err == nil && len(rows) == 0) {
return nil, &workflow.EntityNotExistsError{
Message: fmt.Sprintf("Workflow execution history not found. WorkflowId: %v, RunId: %v",
*request.Execution.WorkflowId, *request.Execution.RunId),
}
return &p.InternalGetWorkflowExecutionHistoryResponse{}, nil
}

if err != nil {
Expand All @@ -134,7 +131,10 @@ func (m *sqlHistoryManager) GetWorkflowExecutionHistory(request *p.InternalGetWo
offset = v.FirstEventID
}

nextPageToken := serializePageToken(offset)
var nextPageToken []byte
if len(rows) >= request.PageSize {
nextPageToken = serializePageToken(offset)
}
return &p.InternalGetWorkflowExecutionHistoryResponse{
History: history,
LastEventBatchVersion: lastEventBatchVersion,
Expand Down

0 comments on commit d5d74d9

Please sign in to comment.