Skip to content

Commit

Permalink
Add special case handling for validate historyV1 continuousness with…
Browse files Browse the repository at this point in the history
… tests (cadence-workflow#1672)

* Add special case handling for validate historyV1 continuousness for getting history from remote

* fix V1 and add unit test

* add unit test for v2

* add integ test

* fix pagination
  • Loading branch information
longquanzheng authored Apr 8, 2019
1 parent 8772b9a commit ea292b5
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 17 deletions.
32 changes: 21 additions & 11 deletions common/persistence/historyStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"encoding/json"
"fmt"

"github.com/uber/cadence/common/logging"

"github.com/uber-common/bark"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/logging"
)

type (
Expand Down Expand Up @@ -109,7 +110,8 @@ func (m *historyManagerImpl) GetWorkflowExecutionHistory(request *GetWorkflowExe

// GetWorkflowExecutionHistory retrieves the paginated list of history events for given execution
func (m *historyManagerImpl) getWorkflowExecutionHistory(request *GetWorkflowExecutionHistoryRequest, byBatch bool) ([]*workflow.History, *workflow.History, []byte, int64, int, error) {
token, err := m.deserializeToken(request)
defaultLastEventID := request.FirstEventID - 1
token, err := m.deserializeToken(request, defaultLastEventID)
if err != nil {
return nil, nil, nil, 0, 0, err
}
Expand Down Expand Up @@ -150,13 +152,21 @@ func (m *historyManagerImpl) getWorkflowExecutionHistory(request *GetWorkflowExe
}

if len(historyBatch) == 0 || historyBatch[0].GetEventId() > token.LastEventID+1 {
logger := m.logger.WithFields(bark.Fields{
logging.TagWorkflowExecutionID: request.Execution.GetWorkflowId(),
logging.TagWorkflowRunID: request.Execution.GetRunId(),
logging.TagDomainID: request.DomainID,
})
logger.Error("Unexpected event batch")
return nil, nil, nil, 0, 0, fmt.Errorf("corrupted history event batch")
if defaultLastEventID == 0 || token.LastEventID != defaultLastEventID {
// We assume application layer want to read from MinEventID(inclusive)
// However, for getting history from remote cluster, there is scenario that we have to read from middle without knowing the firstEventID.
// In that case we don't validate history continuousness for the first page
// TODO: in this case, some events returned can be invalid(stale). application layer need to make sure it won't make any problems to XDC
logger := m.logger.WithFields(bark.Fields{
logging.TagWorkflowExecutionID: request.Execution.GetWorkflowId(),
logging.TagWorkflowRunID: request.Execution.GetRunId(),
logging.TagDomainID: request.DomainID,
})
logger.Error("Unexpected event batch")
return nil, nil, nil, 0, 0, fmt.Errorf("corrupted history event batch")
} else {
token.LastEventID = historyBatch[0].GetEventId() - 1
}
}

if historyBatch[0].GetEventId() != token.LastEventID+1 {
Expand All @@ -183,10 +193,10 @@ func (m *historyManagerImpl) getWorkflowExecutionHistory(request *GetWorkflowExe
return historyBatches, history, nextToken, lastFirstEventID, size, nil
}

func (m *historyManagerImpl) deserializeToken(request *GetWorkflowExecutionHistoryRequest) (*historyToken, error) {
func (m *historyManagerImpl) deserializeToken(request *GetWorkflowExecutionHistoryRequest, defaultLastEventID int64) (*historyToken, error) {
token := &historyToken{
LastEventBatchVersion: common.EmptyVersion,
LastEventID: request.FirstEventID - 1,
LastEventID: defaultLastEventID,
}

if len(request.NextPageToken) == 0 {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/historyV2Store.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func (m *historyV2ManagerImpl) readHistoryBranch(byBatch bool, request *ReadHist
// We assume application layer want to read from MinEventID(inclusive)
// However, for getting history from remote cluster, there is scenario that we have to read from middle without knowing the firstEventID.
// In that case we don't validate history continuousness for the first page
// TODO: in this case, some events returned can be invalid(stale). application layer need to make sure it won't make any problems to XDC
if defaultLastEventID == 0 || token.LastEventID != defaultLastEventID {
logger.Errorf("Corrupted incontinouous event batch, %v, %v, %v, %v, %v, %v", firstEvent.GetVersion(), lastEvent.GetVersion(), firstEvent.GetEventId(), lastEvent.GetEventId(), eventCount, token.LastEventID)
return nil, nil, nil, 0, 0, &workflow.InternalServiceError{
Expand Down
18 changes: 14 additions & 4 deletions common/persistence/persistence-tests/historyPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (s *HistoryPersistenceSuite) TestGetHistoryEventsCompatibility() {
batches := []*gen.History{
newBatchEventForTest([]int64{1, 2}, 1),
newBatchEventForTest([]int64{3}, 1),
newBatchEventForTest([]int64{4, 5}, 1),
newBatchEventForTest([]int64{5}, 1), // staled batch, should be ignored
newBatchEventForTest([]int64{6, 7}, 1),
newBatchEventForTest([]int64{4, 5, 6}, 1),
newBatchEventForTest([]int64{6}, 1), // staled batch, should be ignored
newBatchEventForTest([]int64{7, 8}, 1),
}

for i, be := range batches {
Expand All @@ -144,7 +144,7 @@ func (s *HistoryPersistenceSuite) TestGetHistoryEventsCompatibility() {
history, token, err := s.GetWorkflowExecutionHistory(domainID, workflowExecution, 1, 8, 3, nil)
s.Nil(err)
s.NotNil(token)
s.Equal(5, len(history.Events))
s.Equal(6, len(history.Events))
for i, e := range history.Events {
s.Equal(int64(i+1), e.GetEventId())
}
Expand All @@ -154,8 +154,18 @@ func (s *HistoryPersistenceSuite) TestGetHistoryEventsCompatibility() {
s.Nil(err)
s.Nil(token)
s.Equal(2, len(history.Events))
s.Equal(int64(7), history.Events[0].GetEventId())
s.Equal(int64(8), history.Events[1].GetEventId())

// Start over, but read from middle, should not return error, but the first batch should be ignored by application layer
token = nil
history, token, err = s.GetWorkflowExecutionHistory(domainID, workflowExecution, 5, 8, 3, token)
s.Nil(err)
s.Nil(token)
s.Equal(3, len(history.Events))
s.Equal(int64(6), history.Events[0].GetEventId())
s.Equal(int64(7), history.Events[1].GetEventId())
s.Equal(int64(8), history.Events[2].GetEventId())
}

// TestDeleteHistoryEvents test
Expand Down
37 changes: 35 additions & 2 deletions common/persistence/persistence-tests/historyV2PersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,38 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() {
s.Nil(err)
historyW.Events = append(historyW.Events, events...)

// stale event batch
events = s.genRandomEvents([]int64{6, 7, 8}, 0)
err = s.appendNewNode(bi, events, 1)
s.Nil(err)
// stale event batch
events = s.genRandomEvents([]int64{6, 7, 8}, 0)
err = s.appendNewNode(bi, events, 2)
s.Nil(err)
// stale event batch
events = s.genRandomEvents([]int64{6, 7, 8}, 1)
err = s.appendNewNode(bi, events, 3)
s.Nil(err)

events = s.genRandomEvents([]int64{9}, 1)
err = s.appendNewNode(bi, events, 1)
s.Nil(err)
historyW.Events = append(historyW.Events, events...)

// Start to read from middle, should not return error, but the first batch should be ignored by application layer
req := &p.ReadHistoryBranchRequest{
BranchToken: bi,
MinEventID: 6,
MaxEventID: 10,
PageSize: 4,
NextPageToken: nil,
}
// first page
resp, err := s.HistoryV2Mgr.ReadHistoryBranch(req)
s.Nil(err)
s.Equal(4, len(resp.HistoryEvents))
s.Equal(int64(6), resp.HistoryEvents[0].GetEventId())

events = s.genRandomEvents([]int64{10}, 1)
err = s.appendNewNode(bi, events, 1)
s.Nil(err)
Expand Down Expand Up @@ -191,20 +218,26 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() {
//read branch to verify
historyR := &workflow.History{}

req := &p.ReadHistoryBranchRequest{
req = &p.ReadHistoryBranchRequest{
BranchToken: bi2,
MinEventID: 1,
MaxEventID: 21,
PageSize: 3,
NextPageToken: nil,
}
// first page
resp, err := s.HistoryV2Mgr.ReadHistoryBranch(req)
resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req)
s.Nil(err)
s.Equal(8, len(resp.HistoryEvents))
historyR.Events = append(historyR.Events, resp.HistoryEvents...)
req.NextPageToken = resp.NextPageToken

// this page is all stale batches
resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req)
s.Nil(err)
s.Equal(0, len(resp.HistoryEvents))
historyR.Events = append(historyR.Events, resp.HistoryEvents...)
req.NextPageToken = resp.NextPageToken
// second page
resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req)
s.Nil(err)
Expand Down
147 changes: 147 additions & 0 deletions host/gethistory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,3 +604,150 @@ func (s *integrationSuite) TestGetWorkflowExecutionRawHistory_All() {
s.True(len(blobs) == 2)
s.True(len(events) == 3)
}

func (s *integrationSuite) TestGetWorkflowExecutionRawHistory_InTheMiddle() {
workflowID := "integration-get-workflow-history-raw-events-in-the-middle"
workflowTypeName := "integration-get-workflow-history-raw-events-in-the-middle-type"
tasklistName := "integration-get-workflow-history-raw-events-in-the-middle-tasklist"
identity := "worker1"
activityName := "activity_type1"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(workflowTypeName)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tasklistName)

// Start workflow execution
request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(workflowID),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
}

we, err := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err)
execution := &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(we.GetRunId()),
}

s.Logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

// decider logic
activityScheduled := false
activityData := int32(1)
// var signalEvent *workflow.HistoryEvent
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {

if !activityScheduled {
activityScheduled = true
buf := new(bytes.Buffer)
s.Nil(binary.Write(buf, binary.LittleEndian, activityData))

return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask),
ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr(strconv.Itoa(int(1))),
ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)},
TaskList: taskList,
Input: buf.Bytes(),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(100),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(25),
StartToCloseTimeoutSeconds: common.Int32Ptr(50),
HeartbeatTimeoutSeconds: common.Int32Ptr(25),
},
}}, nil
}

return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

// activity handler
atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType,
activityID string, input []byte, taskToken []byte) ([]byte, bool, error) {

return []byte("Activity Result."), false, nil
}

poller := &TaskPoller{
Engine: s.engine,
Domain: s.domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
ActivityHandler: atHandler,
Logger: s.Logger,
T: s.T(),
}

getHistory := func(domain string, execution *workflow.WorkflowExecution, firstEventID int64, nextEventID int64,
token []byte) (*admin.GetWorkflowExecutionRawHistoryResponse, error) {

return s.adminClient.GetWorkflowExecutionRawHistory(createContext(), &admin.GetWorkflowExecutionRawHistoryRequest{
Domain: common.StringPtr(domain),
Execution: execution,
FirstEventId: common.Int64Ptr(firstEventID),
NextEventId: common.Int64Ptr(nextEventID),
MaximumPageSize: common.Int32Ptr(1),
NextPageToken: token,
})
}

// poll so workflow will make progress
poller.PollAndProcessDecisionTask(false, false)
// continue the workflow by processing activity
poller.PollAndProcessActivityTask(false)
// poll so workflow will make progress
poller.PollAndProcessDecisionTask(false, false)

// now, there shall be 5 batches of events:
// 1. start event and decision task scheduled;
// 2. decision task started
// 3. decision task completed and activity task scheduled
// 4. activity task started
// 5. activity task completed and decision task scheduled
// 6. decision task started
// 7. decision task completed and workflow execution completed

// trying getting history from the middle to the end
firstEventID := int64(5)
var token []byte
// this should get the #4 batch, activity task started
resp, err := getHistory(s.domainName, execution, firstEventID, common.EndEventID, token)
s.Nil(err)
s.Equal(1, len(resp.HistoryBatches))
token = resp.NextPageToken
s.NotEmpty(token)

// this should get the #5 batch, activity task completed and decision task scheduled
resp, err = getHistory(s.domainName, execution, firstEventID, common.EndEventID, token)
s.Nil(err)
s.Equal(1, len(resp.HistoryBatches))
token = resp.NextPageToken
s.NotEmpty(token)

// this should get the #6 batch, decision task started
resp, err = getHistory(s.domainName, execution, firstEventID, common.EndEventID, token)
s.Nil(err)
s.Equal(1, len(resp.HistoryBatches))
token = resp.NextPageToken
s.NotEmpty(token)

// this should get the #7 batch, decision task completed and workflow execution completed
resp, err = getHistory(s.domainName, execution, firstEventID, common.EndEventID, token)
s.Nil(err)
s.Equal(1, len(resp.HistoryBatches))
}

0 comments on commit ea292b5

Please sign in to comment.