Skip to content

Commit

Permalink
Fix list visibility from ES last page token is not empty (cadence-wor…
Browse files Browse the repository at this point in the history
…kflow#1567)

* Fix list visibility from ES last page token

* avoid additional change
  • Loading branch information
vancexu authored Mar 20, 2019
1 parent d18a46b commit a8a2452
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
28 changes: 15 additions & 13 deletions common/persistence/elasticsearch/elasticsearchVisibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (v *esVisibilityManager) ListOpenWorkflowExecutions(
}
}

return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen)
return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen, request.PageSize)
}

func (v *esVisibilityManager) ListClosedWorkflowExecutions(
Expand All @@ -124,7 +124,7 @@ func (v *esVisibilityManager) ListClosedWorkflowExecutions(
}
}

return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen)
return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen, request.PageSize)
}

func (v *esVisibilityManager) ListOpenWorkflowExecutionsByType(
Expand All @@ -144,7 +144,7 @@ func (v *esVisibilityManager) ListOpenWorkflowExecutionsByType(
}
}

return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen)
return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen, request.PageSize)
}

func (v *esVisibilityManager) ListClosedWorkflowExecutionsByType(
Expand All @@ -164,7 +164,7 @@ func (v *esVisibilityManager) ListClosedWorkflowExecutionsByType(
}
}

return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen)
return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen, request.PageSize)
}

func (v *esVisibilityManager) ListOpenWorkflowExecutionsByWorkflowID(
Expand All @@ -184,7 +184,7 @@ func (v *esVisibilityManager) ListOpenWorkflowExecutionsByWorkflowID(
}
}

return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen)
return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen, request.PageSize)
}

func (v *esVisibilityManager) ListClosedWorkflowExecutionsByWorkflowID(
Expand All @@ -204,7 +204,7 @@ func (v *esVisibilityManager) ListClosedWorkflowExecutionsByWorkflowID(
}
}

return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen)
return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen, request.PageSize)
}

func (v *esVisibilityManager) ListClosedWorkflowExecutionsByStatus(
Expand All @@ -224,7 +224,7 @@ func (v *esVisibilityManager) ListClosedWorkflowExecutionsByStatus(
}
}

return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen)
return v.getListWorkflowExecutionsResponse(searchResult.Hits, token, isOpen, request.PageSize)
}

func (v *esVisibilityManager) GetClosedWorkflowExecution(
Expand Down Expand Up @@ -318,18 +318,20 @@ func (v *esVisibilityManager) getSearchResult(request *p.ListWorkflowExecutionsR
}

func (v *esVisibilityManager) getListWorkflowExecutionsResponse(searchHits *elastic.SearchHits,
token *esVisibilityPageToken, isOpen bool) (*p.ListWorkflowExecutionsResponse, error) {
token *esVisibilityPageToken, isOpen bool, pageSize int) (*p.ListWorkflowExecutionsResponse, error) {

response := &p.ListWorkflowExecutionsResponse{}
actualHits := searchHits.Hits
numOfActualHits := len(actualHits)

nextPageToken, err := v.serializePageToken(&esVisibilityPageToken{From: token.From + numOfActualHits})
if err != nil {
return nil, err
if numOfActualHits == pageSize {
nextPageToken, err := v.serializePageToken(&esVisibilityPageToken{From: token.From + numOfActualHits})
if err != nil {
return nil, err
}
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
}
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

response.Executions = make([]*workflow.WorkflowExecutionInfo, 0)
for i := 0; i < numOfActualHits; i++ {
Expand Down
15 changes: 10 additions & 5 deletions common/persistence/elasticsearch/elasticsearchVisibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,12 @@ func (s *ESVisibilitySuite) TestGetSearchResult() {
func (s *ESVisibilitySuite) TestGetListWorkflowExecutionsResponse() {
isOpen := true
token := &esVisibilityPageToken{From: 0}
serializedToken, _ := s.visibilityMgr.serializePageToken(token)

// test for empty hits
searchHits := &elastic.SearchHits{}
resp, err := s.visibilityMgr.getListWorkflowExecutionsResponse(searchHits, token, isOpen)
resp, err := s.visibilityMgr.getListWorkflowExecutionsResponse(searchHits, token, isOpen, 1)
s.NoError(err)
s.Equal(serializedToken, resp.NextPageToken)
s.Equal(0, len(resp.NextPageToken))
s.Equal(0, len(resp.Executions))

// test for one hits
Expand All @@ -384,11 +383,17 @@ func (s *ESVisibilitySuite) TestGetListWorkflowExecutionsResponse() {
Source: source,
}
searchHits.Hits = []*elastic.SearchHit{searchHit}
resp, err = s.visibilityMgr.getListWorkflowExecutionsResponse(searchHits, token, isOpen)
resp, err = s.visibilityMgr.getListWorkflowExecutionsResponse(searchHits, token, isOpen, 1)
s.NoError(err)
serializedToken, _ = s.visibilityMgr.serializePageToken(&esVisibilityPageToken{From: 1})
serializedToken, _ := s.visibilityMgr.serializePageToken(&esVisibilityPageToken{From: 1})
s.Equal(serializedToken, resp.NextPageToken)
s.Equal(1, len(resp.Executions))

// test for last page hits
resp, err = s.visibilityMgr.getListWorkflowExecutionsResponse(searchHits, token, isOpen, 2)
s.NoError(err)
s.Equal(0, len(resp.NextPageToken))
s.Equal(1, len(resp.Executions))
}

func (s *ESVisibilitySuite) TestDeserializePageToken() {
Expand Down

0 comments on commit a8a2452

Please sign in to comment.