Skip to content

Commit

Permalink
Archival fix index out of bounds exception when reading to end of his…
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Jun 5, 2019
1 parent 01c37e1 commit 7b2e335
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 16 deletions.
13 changes: 13 additions & 0 deletions service/worker/archiver/history_blob_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,19 @@ func (i *historyBlobIterator) readBlobEvents(pageToken []byte) ([]*shared.Histor
size += currSize
nextPageToken = currNextPageToken
}
// If nextPageToken is not empty it is still possible there are no more events.
// This occurs if history was read exactly to the last event.
// Here we look forward one page so that we can treat reading exactly to the end of history
// the same way as reading through the end of history.
if len(nextPageToken) > 0 {
lookAheadHistoryEvents, _, _, err := i.readHistory(nextPageToken)
if err != nil {
return nil, nil, false, err
}
if len(lookAheadHistoryEvents) == 0 {
nextPageToken = nil
}
}
return historyEvents, nextPageToken, len(nextPageToken) == 0, nil
}

Expand Down
95 changes: 79 additions & 16 deletions service/worker/archiver/history_blob_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Fail_FirstCallToReadHistor
lastEventFailoverVersion: 1,
},
}
historyManager, pageTokens := s.constructMockHistoryManager(0, pages...)
historyManager, pageTokens := s.constructMockHistoryManager(0, false, pages...)
itr := s.constructTestHistoryBlobIterator(historyManager, nil, nil)
startingIteratorState := s.copyIteratorState(itr)
events, nextPageToken, historyEndReached, err := itr.readBlobEvents(pageTokens[0])
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Fail_NonFirstCallToReadHis
lastEventFailoverVersion: 1,
},
}
historyManager, pageTokens := s.constructMockHistoryManager(1, pages...)
historyManager, pageTokens := s.constructMockHistoryManager(1, false, pages...)
itr := s.constructTestHistoryBlobIterator(historyManager, nil, nil)
startingIteratorState := s.copyIteratorState(itr)
events, nextPageToken, historyEndReached, err := itr.readBlobEvents(pageTokens[0])
Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Success_ReadToHistoryEnd()
lastEventFailoverVersion: 1,
},
}
historyManager, pageTokens := s.constructMockHistoryManager(-1, pages...)
historyManager, pageTokens := s.constructMockHistoryManager(-1, false, pages...)
// ensure target blob size is greater than total history length to ensure all of history is read
config := constructConfig(testDefaultPersistencePageSize, 1000)
itr := s.constructTestHistoryBlobIterator(historyManager, nil, config)
Expand Down Expand Up @@ -247,7 +247,7 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Success_TargetSizeSatisfie
lastEventFailoverVersion: 1,
},
}
historyManager, pageTokens := s.constructMockHistoryManager(-1, pages...)
historyManager, pageTokens := s.constructMockHistoryManager(-1, false, pages...)
// ensure target blob size is smaller than full length of history so that not all of history is read
config := constructConfig(testDefaultPersistencePageSize, 250)
itr := s.constructTestHistoryBlobIterator(historyManager, nil, config)
Expand Down Expand Up @@ -286,7 +286,7 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Success_TargetSizeSatisfie
lastEventFailoverVersion: 1,
},
}
historyManager, pageTokens := s.constructMockHistoryManager(-1, pages...)
historyManager, pageTokens := s.constructMockHistoryManager(-1, false, pages...)
// set target blob size such that all of history is read and the target blob size becomes satisfied upon reading last blob
config := constructConfig(testDefaultPersistencePageSize, 301)
itr := s.constructTestHistoryBlobIterator(historyManager, nil, config)
Expand All @@ -300,6 +300,43 @@ func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Success_TargetSizeSatisfie
s.assertStateMatches(startingIteratorState, itr)
}

func (s *HistoryBlobIteratorSuite) TestReadBlobEvents_Success_ReadExactlyToHistoryEnd() {
pages := []page{
{
size: 200,
numEvents: 10,
firstEventID: 1,
firstEventFailoverVersion: 1,
lastEventFailoverVersion: 1,
},
{
size: 100,
numEvents: 15,
firstEventID: 11,
firstEventFailoverVersion: 1,
lastEventFailoverVersion: 1,
},
{
size: 500,
numEvents: 50,
firstEventID: 26,
firstEventFailoverVersion: 1,
lastEventFailoverVersion: 1,
},
}
historyManager, pageTokens := s.constructMockHistoryManager(-1, true, pages...)
config := constructConfig(testDefaultPersistencePageSize, 800)
itr := s.constructTestHistoryBlobIterator(historyManager, nil, config)
startingIteratorState := s.copyIteratorState(itr)
events, nextPageToken, historyEndReached, err := itr.readBlobEvents(pageTokens[0])
s.NotNil(events)
s.Len(events, 75)
s.Len(nextPageToken, 0)
s.True(historyEndReached)
s.NoError(err)
s.assertStateMatches(startingIteratorState, itr)
}

func (s *HistoryBlobIteratorSuite) TestNext_Fail_IteratorDepleted() {
pages := []page{
{
Expand All @@ -324,7 +361,7 @@ func (s *HistoryBlobIteratorSuite) TestNext_Fail_IteratorDepleted() {
lastEventFailoverVersion: 5,
},
}
historyManager, _ := s.constructMockHistoryManager(-1, pages...)
historyManager, _ := s.constructMockHistoryManager(-1, true, pages...)
// set target blob size such that a single call to next will read all of history
config := constructConfig(testDefaultPersistencePageSize, 301)
itr := s.constructTestHistoryBlobIterator(historyManager, nil, config)
Expand Down Expand Up @@ -368,17 +405,24 @@ func (s *HistoryBlobIteratorSuite) TestNext_Fail_ReturnErrOnSecondCallToNext() {
numEvents: 15,
firstEventID: 11,
firstEventFailoverVersion: 1,
lastEventFailoverVersion: 2,
lastEventFailoverVersion: 1,
},
{
size: 500,
numEvents: 50,
size: 200,
numEvents: 10,
firstEventID: 26,
firstEventFailoverVersion: 2,
lastEventFailoverVersion: 5,
firstEventFailoverVersion: 1,
lastEventFailoverVersion: 1,
},
{
size: 100,
numEvents: 15,
firstEventID: 36,
firstEventFailoverVersion: 1,
lastEventFailoverVersion: 2,
},
}
historyManager, pageTokens := s.constructMockHistoryManager(2, pages...)
historyManager, pageTokens := s.constructMockHistoryManager(3, false, pages...)
// set target blob size such that the first two pages are read for blob one without error, third page will return error
config := constructConfig(testDefaultPersistencePageSize, 250)
itr := s.constructTestHistoryBlobIterator(historyManager, nil, config)
Expand All @@ -394,7 +438,7 @@ func (s *HistoryBlobIteratorSuite) TestNext_Fail_ReturnErrOnSecondCallToNext() {
s.Equal(common.FirstBlobPageToken, *blob.Header.CurrentPageToken)
s.Equal(common.FirstBlobPageToken+1, *blob.Header.NextPageToken)
s.Equal(int64(1), *blob.Header.FirstFailoverVersion)
s.Equal(int64(2), *blob.Header.LastFailoverVersion)
s.Equal(int64(1), *blob.Header.LastFailoverVersion)
s.Equal(int64(1), *blob.Header.FirstEventID)
s.Equal(int64(25), *blob.Header.LastEventID)
s.NoError(err)
Expand All @@ -418,7 +462,7 @@ func (s *HistoryBlobIteratorSuite) TestNext_Success_TenCallsToNext() {
}
pages = append(pages, p)
}
historyManager, pageTokens := s.constructMockHistoryManager(-1, pages...)
historyManager, pageTokens := s.constructMockHistoryManager(-1, false, pages...)
// set config such that every 10 persistence pages is one blob
config := constructConfig(testDefaultPersistencePageSize, 10000)
itr := s.constructTestHistoryBlobIterator(historyManager, nil, config)
Expand Down Expand Up @@ -456,7 +500,7 @@ func (s *HistoryBlobIteratorSuite) TestNext_Success_TenCallsToNext() {
s.False(itr.HasNext())
}

func (s *HistoryBlobIteratorSuite) constructMockHistoryManager(returnErrorOnPage int, pages ...page) (*mocks.HistoryManager, [][]byte) {
func (s *HistoryBlobIteratorSuite) constructMockHistoryManager(returnErrorOnPage int, lastPageHasNextPageToken bool, pages ...page) (*mocks.HistoryManager, [][]byte) {
mockHistoryManager := &mocks.HistoryManager{}
var pageTokens [][]byte
for i, p := range pages {
Expand All @@ -481,7 +525,7 @@ func (s *HistoryBlobIteratorSuite) constructMockHistoryManager(returnErrorOnPage
return mockHistoryManager, pageTokens
}
var nextPageToken []byte
if i != len(pages)-1 {
if i != len(pages)-1 || lastPageHasNextPageToken {
nextPageToken = []byte(fmt.Sprintf("%v", i+1))
}
resp := &persistence.GetWorkflowExecutionHistoryResponse{
Expand All @@ -493,6 +537,25 @@ func (s *HistoryBlobIteratorSuite) constructMockHistoryManager(returnErrorOnPage
}
mockHistoryManager.On("GetWorkflowExecutionHistory", req).Return(resp, nil)
}
if lastPageHasNextPageToken {
req := &persistence.GetWorkflowExecutionHistoryRequest{
DomainID: testDomainID,
Execution: shared.WorkflowExecution{
WorkflowId: common.StringPtr(testWorkflowID),
RunId: common.StringPtr(testRunID),
},
FirstEventID: common.FirstEventID,
NextEventID: testNextEventID,
PageSize: testDefaultPersistencePageSize,
NextPageToken: []byte(fmt.Sprintf("%v", len(pages))),
}
resp := &persistence.GetWorkflowExecutionHistoryResponse{
History: &shared.History{},
NextPageToken: nil,
Size: 0,
}
mockHistoryManager.On("GetWorkflowExecutionHistory", req).Return(resp, nil)
}
return mockHistoryManager, pageTokens
}

Expand Down

0 comments on commit 7b2e335

Please sign in to comment.