From ea292b5ee24bda224e1c1f385f50df479dbda218 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Mon, 8 Apr 2019 14:32:43 -0700 Subject: [PATCH] Add special case handling for validate historyV1 continuousness with tests (#1672) * Add special case handling for validate historyV1 continuousness for getting history from remote * fix V1 and add unit test * add unit test for v2 * add integ test * fix pagination --- common/persistence/historyStore.go | 32 ++-- common/persistence/historyV2Store.go | 1 + .../historyPersistenceTest.go | 18 ++- .../historyV2PersistenceTest.go | 37 ++++- host/gethistory_test.go | 147 ++++++++++++++++++ 5 files changed, 218 insertions(+), 17 deletions(-) diff --git a/common/persistence/historyStore.go b/common/persistence/historyStore.go index f6e57c799be..15e27979577 100644 --- a/common/persistence/historyStore.go +++ b/common/persistence/historyStore.go @@ -24,10 +24,11 @@ import ( "encoding/json" "fmt" + "github.com/uber/cadence/common/logging" + "github.com/uber-common/bark" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" - "github.com/uber/cadence/common/logging" ) type ( @@ -109,7 +110,8 @@ func (m *historyManagerImpl) GetWorkflowExecutionHistory(request *GetWorkflowExe // GetWorkflowExecutionHistory retrieves the paginated list of history events for given execution func (m *historyManagerImpl) getWorkflowExecutionHistory(request *GetWorkflowExecutionHistoryRequest, byBatch bool) ([]*workflow.History, *workflow.History, []byte, int64, int, error) { - token, err := m.deserializeToken(request) + defaultLastEventID := request.FirstEventID - 1 + token, err := m.deserializeToken(request, defaultLastEventID) if err != nil { return nil, nil, nil, 0, 0, err } @@ -150,13 +152,21 @@ func (m *historyManagerImpl) getWorkflowExecutionHistory(request *GetWorkflowExe } if len(historyBatch) == 0 || historyBatch[0].GetEventId() > token.LastEventID+1 { - logger := m.logger.WithFields(bark.Fields{ - logging.TagWorkflowExecutionID: request.Execution.GetWorkflowId(), - logging.TagWorkflowRunID: request.Execution.GetRunId(), - logging.TagDomainID: request.DomainID, - }) - logger.Error("Unexpected event batch") - return nil, nil, nil, 0, 0, fmt.Errorf("corrupted history event batch") + if defaultLastEventID == 0 || token.LastEventID != defaultLastEventID { + // We assume application layer want to read from MinEventID(inclusive) + // However, for getting history from remote cluster, there is scenario that we have to read from middle without knowing the firstEventID. + // In that case we don't validate history continuousness for the first page + // TODO: in this case, some events returned can be invalid(stale). application layer need to make sure it won't make any problems to XDC + logger := m.logger.WithFields(bark.Fields{ + logging.TagWorkflowExecutionID: request.Execution.GetWorkflowId(), + logging.TagWorkflowRunID: request.Execution.GetRunId(), + logging.TagDomainID: request.DomainID, + }) + logger.Error("Unexpected event batch") + return nil, nil, nil, 0, 0, fmt.Errorf("corrupted history event batch") + } else { + token.LastEventID = historyBatch[0].GetEventId() - 1 + } } if historyBatch[0].GetEventId() != token.LastEventID+1 { @@ -183,10 +193,10 @@ func (m *historyManagerImpl) getWorkflowExecutionHistory(request *GetWorkflowExe return historyBatches, history, nextToken, lastFirstEventID, size, nil } -func (m *historyManagerImpl) deserializeToken(request *GetWorkflowExecutionHistoryRequest) (*historyToken, error) { +func (m *historyManagerImpl) deserializeToken(request *GetWorkflowExecutionHistoryRequest, defaultLastEventID int64) (*historyToken, error) { token := &historyToken{ LastEventBatchVersion: common.EmptyVersion, - LastEventID: request.FirstEventID - 1, + LastEventID: defaultLastEventID, } if len(request.NextPageToken) == 0 { diff --git a/common/persistence/historyV2Store.go b/common/persistence/historyV2Store.go index 327fe89ba26..451dea4fa9d 100644 --- a/common/persistence/historyV2Store.go +++ b/common/persistence/historyV2Store.go @@ -345,6 +345,7 @@ func (m *historyV2ManagerImpl) readHistoryBranch(byBatch bool, request *ReadHist // We assume application layer want to read from MinEventID(inclusive) // However, for getting history from remote cluster, there is scenario that we have to read from middle without knowing the firstEventID. // In that case we don't validate history continuousness for the first page + // TODO: in this case, some events returned can be invalid(stale). application layer need to make sure it won't make any problems to XDC if defaultLastEventID == 0 || token.LastEventID != defaultLastEventID { logger.Errorf("Corrupted incontinouous event batch, %v, %v, %v, %v, %v, %v", firstEvent.GetVersion(), lastEvent.GetVersion(), firstEvent.GetEventId(), lastEvent.GetEventId(), eventCount, token.LastEventID) return nil, nil, nil, 0, 0, &workflow.InternalServiceError{ diff --git a/common/persistence/persistence-tests/historyPersistenceTest.go b/common/persistence/persistence-tests/historyPersistenceTest.go index 53b391b22f6..97c1ceffd21 100644 --- a/common/persistence/persistence-tests/historyPersistenceTest.go +++ b/common/persistence/persistence-tests/historyPersistenceTest.go @@ -130,9 +130,9 @@ func (s *HistoryPersistenceSuite) TestGetHistoryEventsCompatibility() { batches := []*gen.History{ newBatchEventForTest([]int64{1, 2}, 1), newBatchEventForTest([]int64{3}, 1), - newBatchEventForTest([]int64{4, 5}, 1), - newBatchEventForTest([]int64{5}, 1), // staled batch, should be ignored - newBatchEventForTest([]int64{6, 7}, 1), + newBatchEventForTest([]int64{4, 5, 6}, 1), + newBatchEventForTest([]int64{6}, 1), // staled batch, should be ignored + newBatchEventForTest([]int64{7, 8}, 1), } for i, be := range batches { @@ -144,7 +144,7 @@ func (s *HistoryPersistenceSuite) TestGetHistoryEventsCompatibility() { history, token, err := s.GetWorkflowExecutionHistory(domainID, workflowExecution, 1, 8, 3, nil) s.Nil(err) s.NotNil(token) - s.Equal(5, len(history.Events)) + s.Equal(6, len(history.Events)) for i, e := range history.Events { s.Equal(int64(i+1), e.GetEventId()) } @@ -154,8 +154,18 @@ func (s *HistoryPersistenceSuite) TestGetHistoryEventsCompatibility() { s.Nil(err) s.Nil(token) s.Equal(2, len(history.Events)) + s.Equal(int64(7), history.Events[0].GetEventId()) + s.Equal(int64(8), history.Events[1].GetEventId()) + + // Start over, but read from middle, should not return error, but the first batch should be ignored by application layer + token = nil + history, token, err = s.GetWorkflowExecutionHistory(domainID, workflowExecution, 5, 8, 3, token) + s.Nil(err) + s.Nil(token) + s.Equal(3, len(history.Events)) s.Equal(int64(6), history.Events[0].GetEventId()) s.Equal(int64(7), history.Events[1].GetEventId()) + s.Equal(int64(8), history.Events[2].GetEventId()) } // TestDeleteHistoryEvents test diff --git a/common/persistence/persistence-tests/historyV2PersistenceTest.go b/common/persistence/persistence-tests/historyV2PersistenceTest.go index f4b7d5a5444..38cecb586f9 100644 --- a/common/persistence/persistence-tests/historyV2PersistenceTest.go +++ b/common/persistence/persistence-tests/historyV2PersistenceTest.go @@ -138,11 +138,38 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() { s.Nil(err) historyW.Events = append(historyW.Events, events...) + // stale event batch + events = s.genRandomEvents([]int64{6, 7, 8}, 0) + err = s.appendNewNode(bi, events, 1) + s.Nil(err) + // stale event batch + events = s.genRandomEvents([]int64{6, 7, 8}, 0) + err = s.appendNewNode(bi, events, 2) + s.Nil(err) + // stale event batch + events = s.genRandomEvents([]int64{6, 7, 8}, 1) + err = s.appendNewNode(bi, events, 3) + s.Nil(err) + events = s.genRandomEvents([]int64{9}, 1) err = s.appendNewNode(bi, events, 1) s.Nil(err) historyW.Events = append(historyW.Events, events...) + // Start to read from middle, should not return error, but the first batch should be ignored by application layer + req := &p.ReadHistoryBranchRequest{ + BranchToken: bi, + MinEventID: 6, + MaxEventID: 10, + PageSize: 4, + NextPageToken: nil, + } + // first page + resp, err := s.HistoryV2Mgr.ReadHistoryBranch(req) + s.Nil(err) + s.Equal(4, len(resp.HistoryEvents)) + s.Equal(int64(6), resp.HistoryEvents[0].GetEventId()) + events = s.genRandomEvents([]int64{10}, 1) err = s.appendNewNode(bi, events, 1) s.Nil(err) @@ -191,7 +218,7 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() { //read branch to verify historyR := &workflow.History{} - req := &p.ReadHistoryBranchRequest{ + req = &p.ReadHistoryBranchRequest{ BranchToken: bi2, MinEventID: 1, MaxEventID: 21, @@ -199,12 +226,18 @@ func (s *HistoryV2PersistenceSuite) TestReadBranchByPagination() { NextPageToken: nil, } // first page - resp, err := s.HistoryV2Mgr.ReadHistoryBranch(req) + resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req) s.Nil(err) s.Equal(8, len(resp.HistoryEvents)) historyR.Events = append(historyR.Events, resp.HistoryEvents...) req.NextPageToken = resp.NextPageToken + // this page is all stale batches + resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req) + s.Nil(err) + s.Equal(0, len(resp.HistoryEvents)) + historyR.Events = append(historyR.Events, resp.HistoryEvents...) + req.NextPageToken = resp.NextPageToken // second page resp, err = s.HistoryV2Mgr.ReadHistoryBranch(req) s.Nil(err) diff --git a/host/gethistory_test.go b/host/gethistory_test.go index b80ab0da228..e1afef9feb7 100644 --- a/host/gethistory_test.go +++ b/host/gethistory_test.go @@ -604,3 +604,150 @@ func (s *integrationSuite) TestGetWorkflowExecutionRawHistory_All() { s.True(len(blobs) == 2) s.True(len(events) == 3) } + +func (s *integrationSuite) TestGetWorkflowExecutionRawHistory_InTheMiddle() { + workflowID := "integration-get-workflow-history-raw-events-in-the-middle" + workflowTypeName := "integration-get-workflow-history-raw-events-in-the-middle-type" + tasklistName := "integration-get-workflow-history-raw-events-in-the-middle-tasklist" + identity := "worker1" + activityName := "activity_type1" + + workflowType := &workflow.WorkflowType{} + workflowType.Name = common.StringPtr(workflowTypeName) + + taskList := &workflow.TaskList{} + taskList.Name = common.StringPtr(tasklistName) + + // Start workflow execution + request := &workflow.StartWorkflowExecutionRequest{ + RequestId: common.StringPtr(uuid.New()), + Domain: common.StringPtr(s.domainName), + WorkflowId: common.StringPtr(workflowID), + WorkflowType: workflowType, + TaskList: taskList, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + Identity: common.StringPtr(identity), + } + + we, err := s.engine.StartWorkflowExecution(createContext(), request) + s.Nil(err) + execution := &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(we.GetRunId()), + } + + s.Logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) + + // decider logic + activityScheduled := false + activityData := int32(1) + // var signalEvent *workflow.HistoryEvent + dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) { + + if !activityScheduled { + activityScheduled = true + buf := new(bytes.Buffer) + s.Nil(binary.Write(buf, binary.LittleEndian, activityData)) + + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr(strconv.Itoa(int(1))), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: taskList, + Input: buf.Bytes(), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(100), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(25), + StartToCloseTimeoutSeconds: common.Int32Ptr(50), + HeartbeatTimeoutSeconds: common.Int32Ptr(25), + }, + }}, nil + } + + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), + CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ + Result: []byte("Done."), + }, + }}, nil + } + + // activity handler + atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType, + activityID string, input []byte, taskToken []byte) ([]byte, bool, error) { + + return []byte("Activity Result."), false, nil + } + + poller := &TaskPoller{ + Engine: s.engine, + Domain: s.domainName, + TaskList: taskList, + Identity: identity, + DecisionHandler: dtHandler, + ActivityHandler: atHandler, + Logger: s.Logger, + T: s.T(), + } + + getHistory := func(domain string, execution *workflow.WorkflowExecution, firstEventID int64, nextEventID int64, + token []byte) (*admin.GetWorkflowExecutionRawHistoryResponse, error) { + + return s.adminClient.GetWorkflowExecutionRawHistory(createContext(), &admin.GetWorkflowExecutionRawHistoryRequest{ + Domain: common.StringPtr(domain), + Execution: execution, + FirstEventId: common.Int64Ptr(firstEventID), + NextEventId: common.Int64Ptr(nextEventID), + MaximumPageSize: common.Int32Ptr(1), + NextPageToken: token, + }) + } + + // poll so workflow will make progress + poller.PollAndProcessDecisionTask(false, false) + // continue the workflow by processing activity + poller.PollAndProcessActivityTask(false) + // poll so workflow will make progress + poller.PollAndProcessDecisionTask(false, false) + + // now, there shall be 5 batches of events: + // 1. start event and decision task scheduled; + // 2. decision task started + // 3. decision task completed and activity task scheduled + // 4. activity task started + // 5. activity task completed and decision task scheduled + // 6. decision task started + // 7. decision task completed and workflow execution completed + + // trying getting history from the middle to the end + firstEventID := int64(5) + var token []byte + // this should get the #4 batch, activity task started + resp, err := getHistory(s.domainName, execution, firstEventID, common.EndEventID, token) + s.Nil(err) + s.Equal(1, len(resp.HistoryBatches)) + token = resp.NextPageToken + s.NotEmpty(token) + + // this should get the #5 batch, activity task completed and decision task scheduled + resp, err = getHistory(s.domainName, execution, firstEventID, common.EndEventID, token) + s.Nil(err) + s.Equal(1, len(resp.HistoryBatches)) + token = resp.NextPageToken + s.NotEmpty(token) + + // this should get the #6 batch, decision task started + resp, err = getHistory(s.domainName, execution, firstEventID, common.EndEventID, token) + s.Nil(err) + s.Equal(1, len(resp.HistoryBatches)) + token = resp.NextPageToken + s.NotEmpty(token) + + // this should get the #7 batch, decision task completed and workflow execution completed + resp, err = getHistory(s.domainName, execution, firstEventID, common.EndEventID, token) + s.Nil(err) + s.Equal(1, len(resp.HistoryBatches)) +}