From 7b2e335153fc8c2a567ebb8bcec56ad5741c0a43 Mon Sep 17 00:00:00 2001 From: Andrew Dawson Date: Wed, 5 Jun 2019 15:47:42 -0700 Subject: [PATCH] Archival fix index out of bounds exception when reading to end of history (#1958) --- .../worker/archiver/history_blob_iterator.go | 13 +++ .../archiver/history_blob_iterator_test.go | 95 +++++++++++++++---- 2 files changed, 92 insertions(+), 16 deletions(-) diff --git a/service/worker/archiver/history_blob_iterator.go b/service/worker/archiver/history_blob_iterator.go index 4d7221daefe..ceaee0a10ae 100644 --- a/service/worker/archiver/history_blob_iterator.go +++ b/service/worker/archiver/history_blob_iterator.go @@ -175,6 +175,19 @@ func (i *historyBlobIterator) readBlobEvents(pageToken []byte) ([]*shared.Histor size += currSize nextPageToken = currNextPageToken } + // If nextPageToken is not empty it is still possible there are no more events. + // This occurs if history was read exactly to the last event. + // Here we look forward one page so that we can treat reading exactly to the end of history + // the same way as reading through the end of history. + if len(nextPageToken) > 0 { + lookAheadHistoryEvents, _, _, err := i.readHistory(nextPageToken) + if err != nil { + return nil, nil, false, err + } + if len(lookAheadHistoryEvents) == 0 { + nextPageToken = nil + } + } return historyEvents, nextPageToken, len(nextPageToken) == 0, nil } diff --git a/service/worker/archiver/history_blob_iterator_test.go b/service/worker/archiver/history_blob_iterator_test.go index 21a590e973e..402a963375a 100644 --- a/service/worker/archiver/history_blob_iterator_test.go +++ b/service/worker/archiver/history_blob_iterator_test.go @@ -146,7 +146,7 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Fail_FirstCallToReadHistor lastEventFailoverVersion: 1, }, } - historyManager, pageTokens := s.constructMockHistoryManager(0, pages...) + historyManager, pageTokens := s.constructMockHistoryManager(0, false, pages...) itr := s.constructTestHistoryBlobIterator(historyManager, nil, nil) startingIteratorState := s.copyIteratorState(itr) events, nextPageToken, historyEndReached, err := itr.readBlobEvents(pageTokens[0]) @@ -174,7 +174,7 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Fail_NonFirstCallToReadHis lastEventFailoverVersion: 1, }, } - historyManager, pageTokens := s.constructMockHistoryManager(1, pages...) + historyManager, pageTokens := s.constructMockHistoryManager(1, false, pages...) itr := s.constructTestHistoryBlobIterator(historyManager, nil, nil) startingIteratorState := s.copyIteratorState(itr) events, nextPageToken, historyEndReached, err := itr.readBlobEvents(pageTokens[0]) @@ -209,7 +209,7 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Success_ReadToHistoryEnd() lastEventFailoverVersion: 1, }, } - historyManager, pageTokens := s.constructMockHistoryManager(-1, pages...) + historyManager, pageTokens := s.constructMockHistoryManager(-1, false, pages...) // ensure target blob size is greater than total history length to ensure all of history is read config := constructConfig(testDefaultPersistencePageSize, 1000) itr := s.constructTestHistoryBlobIterator(historyManager, nil, config) @@ -247,7 +247,7 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Success_TargetSizeSatisfie lastEventFailoverVersion: 1, }, } - historyManager, pageTokens := s.constructMockHistoryManager(-1, pages...) + historyManager, pageTokens := s.constructMockHistoryManager(-1, false, pages...) // ensure target blob size is smaller than full length of history so that not all of history is read config := constructConfig(testDefaultPersistencePageSize, 250) itr := s.constructTestHistoryBlobIterator(historyManager, nil, config) @@ -286,7 +286,7 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Success_TargetSizeSatisfie lastEventFailoverVersion: 1, }, } - historyManager, pageTokens := s.constructMockHistoryManager(-1, pages...) + historyManager, pageTokens := s.constructMockHistoryManager(-1, false, pages...) // set target blob size such that all of history is read and the target blob size becomes satisfied upon reading last blob config := constructConfig(testDefaultPersistencePageSize, 301) itr := s.constructTestHistoryBlobIterator(historyManager, nil, config) @@ -300,6 +300,43 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Success_TargetSizeSatisfie s.assertStateMatches(startingIteratorState, itr) } +func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Success_ReadExactlyToHistoryEnd() { + pages := []page{ + { + size: 200, + numEvents: 10, + firstEventID: 1, + firstEventFailoverVersion: 1, + lastEventFailoverVersion: 1, + }, + { + size: 100, + numEvents: 15, + firstEventID: 11, + firstEventFailoverVersion: 1, + lastEventFailoverVersion: 1, + }, + { + size: 500, + numEvents: 50, + firstEventID: 26, + firstEventFailoverVersion: 1, + lastEventFailoverVersion: 1, + }, + } + historyManager, pageTokens := s.constructMockHistoryManager(-1, true, pages...) + config := constructConfig(testDefaultPersistencePageSize, 800) + itr := s.constructTestHistoryBlobIterator(historyManager, nil, config) + startingIteratorState := s.copyIteratorState(itr) + events, nextPageToken, historyEndReached, err := itr.readBlobEvents(pageTokens[0]) + s.NotNil(events) + s.Len(events, 75) + s.Len(nextPageToken, 0) + s.True(historyEndReached) + s.NoError(err) + s.assertStateMatches(startingIteratorState, itr) +} + func (s *HistoryBlobIteratorSuite) TestNext_Fail_IteratorDepleted() { pages := []page{ { @@ -324,7 +361,7 @@ func (s *HistoryBlobIteratorSuite) TestNext_Fail_IteratorDepleted() { lastEventFailoverVersion: 5, }, } - historyManager, _ := s.constructMockHistoryManager(-1, pages...) + historyManager, _ := s.constructMockHistoryManager(-1, true, pages...) // set target blob size such that a single call to next will read all of history config := constructConfig(testDefaultPersistencePageSize, 301) itr := s.constructTestHistoryBlobIterator(historyManager, nil, config) @@ -368,17 +405,24 @@ func (s *HistoryBlobIteratorSuite) TestNext_Fail_ReturnErrOnSecondCallToNext() { numEvents: 15, firstEventID: 11, firstEventFailoverVersion: 1, - lastEventFailoverVersion: 2, + lastEventFailoverVersion: 1, }, { - size: 500, - numEvents: 50, + size: 200, + numEvents: 10, firstEventID: 26, - firstEventFailoverVersion: 2, - lastEventFailoverVersion: 5, + firstEventFailoverVersion: 1, + lastEventFailoverVersion: 1, + }, + { + size: 100, + numEvents: 15, + firstEventID: 36, + firstEventFailoverVersion: 1, + lastEventFailoverVersion: 2, }, } - historyManager, pageTokens := s.constructMockHistoryManager(2, pages...) + historyManager, pageTokens := s.constructMockHistoryManager(3, false, pages...) // set target blob size such that the first two pages are read for blob one without error, third page will return error config := constructConfig(testDefaultPersistencePageSize, 250) itr := s.constructTestHistoryBlobIterator(historyManager, nil, config) @@ -394,7 +438,7 @@ func (s *HistoryBlobIteratorSuite) TestNext_Fail_ReturnErrOnSecondCallToNext() { s.Equal(common.FirstBlobPageToken, *blob.Header.CurrentPageToken) s.Equal(common.FirstBlobPageToken+1, *blob.Header.NextPageToken) s.Equal(int64(1), *blob.Header.FirstFailoverVersion) - s.Equal(int64(2), *blob.Header.LastFailoverVersion) + s.Equal(int64(1), *blob.Header.LastFailoverVersion) s.Equal(int64(1), *blob.Header.FirstEventID) s.Equal(int64(25), *blob.Header.LastEventID) s.NoError(err) @@ -418,7 +462,7 @@ func (s *HistoryBlobIteratorSuite) TestNext_Success_TenCallsToNext() { } pages = append(pages, p) } - historyManager, pageTokens := s.constructMockHistoryManager(-1, pages...) + historyManager, pageTokens := s.constructMockHistoryManager(-1, false, pages...) // set config such that every 10 persistence pages is one blob config := constructConfig(testDefaultPersistencePageSize, 10000) itr := s.constructTestHistoryBlobIterator(historyManager, nil, config) @@ -456,7 +500,7 @@ func (s *HistoryBlobIteratorSuite) TestNext_Success_TenCallsToNext() { s.False(itr.HasNext()) } -func (s *HistoryBlobIteratorSuite) constructMockHistoryManager(returnErrorOnPage int, pages ...page) (*mocks.HistoryManager, [][]byte) { +func (s *HistoryBlobIteratorSuite) constructMockHistoryManager(returnErrorOnPage int, lastPageHasNextPageToken bool, pages ...page) (*mocks.HistoryManager, [][]byte) { mockHistoryManager := &mocks.HistoryManager{} var pageTokens [][]byte for i, p := range pages { @@ -481,7 +525,7 @@ func (s *HistoryBlobIteratorSuite) constructMockHistoryManager(returnErrorOnPage return mockHistoryManager, pageTokens } var nextPageToken []byte - if i != len(pages)-1 { + if i != len(pages)-1 || lastPageHasNextPageToken { nextPageToken = []byte(fmt.Sprintf("%v", i+1)) } resp := &persistence.GetWorkflowExecutionHistoryResponse{ @@ -493,6 +537,25 @@ func (s *HistoryBlobIteratorSuite) constructMockHistoryManager(returnErrorOnPage } mockHistoryManager.On("GetWorkflowExecutionHistory", req).Return(resp, nil) } + if lastPageHasNextPageToken { + req := &persistence.GetWorkflowExecutionHistoryRequest{ + DomainID: testDomainID, + Execution: shared.WorkflowExecution{ + WorkflowId: common.StringPtr(testWorkflowID), + RunId: common.StringPtr(testRunID), + }, + FirstEventID: common.FirstEventID, + NextEventID: testNextEventID, + PageSize: testDefaultPersistencePageSize, + NextPageToken: []byte(fmt.Sprintf("%v", len(pages))), + } + resp := &persistence.GetWorkflowExecutionHistoryResponse{ + History: &shared.History{}, + NextPageToken: nil, + Size: 0, + } + mockHistoryManager.On("GetWorkflowExecutionHistory", req).Return(resp, nil) + } return mockHistoryManager, pageTokens }