Skip to content

Commit

Permalink
Add limit on pagesize for list APIs and fix last page overflow. (cade…
Browse files Browse the repository at this point in the history
…nce-workflow#1940)

* Fix last page overflow

* Add pagesize limit

* Add tests

* Update CLI to use pageToken instead of resultsize

* make fmt

* remove manual tests
  • Loading branch information
vancexu authored Jun 3, 2019
1 parent 4891aa8 commit 4d1e3ad
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 17 deletions.
2 changes: 1 addition & 1 deletion common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (v *esVisibilityStore) getListWorkflowExecutionsResponse(searchHits *elasti

// ES Search API support pagination using From and PageSize, but has limit that From+PageSize cannot exceed a threshold
// to retrieve deeper pages, use ES SearchAfter
if searchHits.TotalHits <= int64(v.config.ESIndexMaxResultWindow()) { // use ES Search From+Size
if searchHits.TotalHits <= int64(v.config.ESIndexMaxResultWindow()-pageSize) { // use ES Search From+Size
nextPageToken, err = v.serializePageToken(&esVisibilityPageToken{From: token.From + numOfActualHits})
} else { // use ES Search After
lastExecution := response.Executions[len(response.Executions)-1]
Expand Down
63 changes: 60 additions & 3 deletions host/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package host

import (
"context"
"encoding/json"
"flag"
"fmt"
Expand Down Expand Up @@ -106,7 +107,7 @@ func (s *elasticsearchIntegrationSuite) TestListOpenWorkflow() {
startFilter.LatestTime = common.Int64Ptr(time.Now().UnixNano())
resp, err := s.engine.ListOpenWorkflowExecutions(createContext(), &workflow.ListOpenWorkflowExecutionsRequest{
Domain: common.StringPtr(s.domainName),
MaximumPageSize: common.Int32Ptr(100),
MaximumPageSize: common.Int32Ptr(defaultTestValueOfESIndexMaxResultWindow),
StartTimeFilter: startFilter,
ExecutionFilter: &workflow.WorkflowExecutionFilter{
WorkflowId: common.StringPtr(id),
Expand Down Expand Up @@ -241,7 +242,7 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_OrQuery() {
var openExecution *workflow.WorkflowExecutionInfo
listRequest := &workflow.ListWorkflowExecutionsRequest{
Domain: common.StringPtr(s.domainName),
PageSize: common.Int32Ptr(100),
PageSize: common.Int32Ptr(defaultTestValueOfESIndexMaxResultWindow),
Query: common.StringPtr(query1),
}
for i := 0; i < numOfRetry; i++ {
Expand Down Expand Up @@ -309,6 +310,62 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_OrQuery() {
s.Equal(3, searchVal)
}

// To test last page search trigger max window size error
func (s *elasticsearchIntegrationSuite) TestListWorkflow_MaxWindowSize() {
// set es index index settings
indexName := s.testClusterConfig.ESConfig.Indices[common.VisibilityAppName]
_, err := s.esClient.IndexPutSettings(indexName).
BodyString(fmt.Sprintf(`{"max_result_window" : %d}`, defaultTestValueOfESIndexMaxResultWindow)).
Do(context.Background())
s.NoError(err)

id := "es-integration-list-workflow-max-window-size-test"
wt := "es-integration-list-workflow-max-window-size-test-type"
tl := "es-integration-list-workflow-max-window-size-test-tasklist"
startRequest := s.createStartWorkflowExecutionRequest(id, wt, tl)

for i := 0; i < defaultTestValueOfESIndexMaxResultWindow; i++ {
startRequest.RequestId = common.StringPtr(uuid.New())
startRequest.WorkflowId = common.StringPtr(id + strconv.Itoa(i))
_, err := s.engine.StartWorkflowExecution(createContext(), startRequest)
s.Nil(err)
}

var listResp *workflow.ListWorkflowExecutionsResponse
var nextPageToken []byte

listRequest := &workflow.ListWorkflowExecutionsRequest{
Domain: common.StringPtr(s.domainName),
PageSize: common.Int32Ptr(int32(defaultTestValueOfESIndexMaxResultWindow)),
NextPageToken: nextPageToken,
Query: common.StringPtr(fmt.Sprintf(`WorkflowType = '%s' and CloseTime = missing`, wt)),
}
// get first page
for i := 0; i < numOfRetry; i++ {
resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest)
s.Nil(err)
if len(resp.GetExecutions()) == defaultTestValueOfESIndexMaxResultWindow {
listResp = resp
break
}
time.Sleep(waitTimeInMs * time.Millisecond)
}
s.True(len(listResp.GetNextPageToken()) != 0)

// the last request
listRequest.NextPageToken = listResp.GetNextPageToken()
resp, err := s.engine.ListWorkflowExecutions(createContext(), listRequest)
s.Nil(err)
s.True(len(resp.GetExecutions()) == 0)
s.True(len(resp.GetNextPageToken()) == 0)

// revert es index index settings
_, err = s.esClient.IndexPutSettings(indexName).
BodyString(fmt.Sprintf(`{"max_result_window" : %d}`, 10000)).
Do(context.Background())
s.NoError(err)
}

func (s *elasticsearchIntegrationSuite) testListWorkflowHelper(numOfWorkflows, pageSize int,
startRequest *workflow.StartWorkflowExecutionRequest, wid, wType string, isScan bool) {

Expand Down Expand Up @@ -381,7 +438,7 @@ func (s *elasticsearchIntegrationSuite) testHelperForReadOnce(runID, query strin
var openExecution *workflow.WorkflowExecutionInfo
listRequest := &workflow.ListWorkflowExecutionsRequest{
Domain: common.StringPtr(s.domainName),
PageSize: common.Int32Ptr(100),
PageSize: common.Int32Ptr(defaultTestValueOfESIndexMaxResultWindow),
Query: common.StringPtr(query),
}
for i := 0; i < numOfRetry; i++ {
Expand Down
25 changes: 25 additions & 0 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,11 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx context.Context,
listRequest.MaximumPageSize = common.Int32Ptr(int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())))
}

if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
return nil, wh.error(&gen.BadRequestError{
Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
}

domain := listRequest.GetDomain()
domainID, err := wh.domainCache.GetDomainID(domain)
if err != nil {
Expand Down Expand Up @@ -2334,6 +2339,11 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context,
listRequest.MaximumPageSize = common.Int32Ptr(int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())))
}

if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
return nil, wh.error(&gen.BadRequestError{
Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
}

domain := listRequest.GetDomain()
domainID, err := wh.domainCache.GetDomainID(domain)
if err != nil {
Expand Down Expand Up @@ -2425,6 +2435,11 @@ func (wh *WorkflowHandler) ListWorkflowExecutions(ctx context.Context, listReque
listRequest.PageSize = common.Int32Ptr(int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())))
}

if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
return nil, wh.error(&gen.BadRequestError{
Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
}

if err := wh.visibilityQueryValidator.ValidateListRequestForQuery(listRequest); err != nil {
return nil, wh.error(err, scope)
}
Expand Down Expand Up @@ -2480,6 +2495,11 @@ func (wh *WorkflowHandler) ScanWorkflowExecutions(ctx context.Context, listReque
listRequest.PageSize = common.Int32Ptr(int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain())))
}

if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
return nil, wh.error(&gen.BadRequestError{
Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}, scope)
}

if err := wh.visibilityQueryValidator.ValidateListRequestForQuery(listRequest); err != nil {
return nil, wh.error(err, scope)
}
Expand Down Expand Up @@ -3316,3 +3336,8 @@ func (wh *WorkflowHandler) validateSearchAttributes(input *gen.SearchAttributes,

return nil
}

func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
return wh.config.EnableReadVisibilityFromES(domain) &&
pageSize > int32(wh.config.ESIndexMaxResultWindow())
}
17 changes: 13 additions & 4 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type (
testDomain string
testDomainID string
logger log.Logger
config *Config
mockClusterMetadata *mocks.ClusterMetadata
mockProducer *mocks.KafkaProducer
mockMetricClient metrics.Client
Expand Down Expand Up @@ -128,8 +129,8 @@ func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl
}

func (s *workflowHandlerSuite) getWorkflowHandlerHelper() *WorkflowHandler {
config := s.newConfig()
wh := s.getWorkflowHandler(config)
s.config = s.newConfig()
wh := s.getWorkflowHandler(s.config)
wh.metricsClient = wh.Service.GetMetricsClient()
wh.domainCache = s.mockDomainCache
wh.visibilityMgr = s.mockVisibilityMgr
Expand Down Expand Up @@ -1219,7 +1220,7 @@ func (s *workflowHandlerSuite) TestListWorkflowExecutions() {

listRequest := &shared.ListWorkflowExecutionsRequest{
Domain: common.StringPtr(s.testDomain),
PageSize: common.Int32Ptr(10),
PageSize: common.Int32Ptr(int32(s.config.ESIndexMaxResultWindow())),
}
ctx := context.Background()

Expand All @@ -1233,6 +1234,10 @@ func (s *workflowHandlerSuite) TestListWorkflowExecutions() {
listRequest.Query = common.StringPtr(query)
_, err = wh.ListWorkflowExecutions(ctx, listRequest)
s.NotNil(err)

listRequest.PageSize = common.Int32Ptr(int32(s.config.ESIndexMaxResultWindow() + 1))
_, err = wh.ListWorkflowExecutions(ctx, listRequest)
s.NotNil(err)
}

func (s *workflowHandlerSuite) TestScantWorkflowExecutions() {
Expand All @@ -1243,7 +1248,7 @@ func (s *workflowHandlerSuite) TestScantWorkflowExecutions() {

listRequest := &shared.ListWorkflowExecutionsRequest{
Domain: common.StringPtr(s.testDomain),
PageSize: common.Int32Ptr(10),
PageSize: common.Int32Ptr(int32(s.config.ESIndexMaxResultWindow())),
}
ctx := context.Background()

Expand All @@ -1257,6 +1262,10 @@ func (s *workflowHandlerSuite) TestScantWorkflowExecutions() {
listRequest.Query = common.StringPtr(query)
_, err = wh.ScanWorkflowExecutions(ctx, listRequest)
s.NotNil(err)

listRequest.PageSize = common.Int32Ptr(int32(s.config.ESIndexMaxResultWindow() + 1))
_, err = wh.ListWorkflowExecutions(ctx, listRequest)
s.NotNil(err)
}

func (s *workflowHandlerSuite) TestCountWorkflowExecutions() {
Expand Down
14 changes: 5 additions & 9 deletions tools/cli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,6 @@ func queryWorkflowHelper(c *cli.Context, queryType string) {
// ListWorkflow list workflow executions based on filters
func ListWorkflow(c *cli.Context) {
more := c.Bool(FlagMore)
pageSize := c.Int(FlagPageSize)
queryOpen := c.Bool(FlagOpen)

printJSON := c.Bool(FlagPrintJSON)
Expand All @@ -881,14 +880,13 @@ func ListWorkflow(c *cli.Context) {
prepareTable(nil)
table.Render()
} else { // require input Enter to view next page
var resultSize int
var nextPageToken []byte
for {
nextPageToken, resultSize = prepareTable(nextPageToken)
nextPageToken, _ = prepareTable(nextPageToken)
table.Render()
table.ClearRows()

if resultSize < pageSize {
if len(nextPageToken) == 0 {
break
}

Expand Down Expand Up @@ -919,8 +917,7 @@ func ListAllWorkflow(c *cli.Context) {
for {
results, nextPageToken = getListResultInRaw(c, queryOpen, nextPageToken)
printListResults(results, printJSON)
//printListResultsInJson(results)
if len(results) < defaultPageSizeForList {
if len(nextPageToken) == 0 {
break
}
}
Expand All @@ -930,11 +927,10 @@ func ListAllWorkflow(c *cli.Context) {

table := createTableForListWorkflow(c, true, queryOpen)
prepareTable := listWorkflow(c, table, queryOpen)
var resultSize int
var nextPageToken []byte
for {
nextPageToken, resultSize = prepareTable(nextPageToken)
if resultSize < defaultPageSizeForList {
nextPageToken, _ = prepareTable(nextPageToken)
if len(nextPageToken) == 0 {
break
}
}
Expand Down

0 comments on commit 4d1e3ad

Please sign in to comment.