Skip to content

Commit

Permalink
Allow custom sorting in ListAllWorkflowExecutions api (cadence-workfl…
Browse files Browse the repository at this point in the history
…ow#6130)

* Allow custom sorting in ListAllWorkflowExecutions api

* Update pinot_visibility_store.go
  • Loading branch information
sankari165 authored Jun 12, 2024
1 parent ecb93a1 commit 10969a0
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 11 deletions.
2 changes: 2 additions & 0 deletions common/persistence/dataVisibilityManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ type (
ListWorkflowExecutionsRequest
PartialMatch bool
WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID
SortColumn string // This should be a valid search attribute
SortOrder string // DESC or ASC
}

// ListWorkflowExecutionsByQueryRequest is used to list executions in a domain
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,8 @@ type (
InternalListWorkflowExecutionsRequest
PartialMatch bool
WorkflowSearchValue string // This value will be searched across workflow type, workflow ID and runID
SortColumn string // This should be a valid search attribute
SortOrder string // DESC or ASC
}

// InternalGetClosedWorkflowExecutionResponse is response from GetWorkflowExecution
Expand Down
20 changes: 16 additions & 4 deletions common/persistence/pinot/pinot_visibility_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
const (
pinotPersistenceName = "pinot"
DescendingOrder = "DESC"
AcendingOrder = "ASC"
AscendingOrder = "ASC"
DomainID = "DomainID"
WorkflowID = "WorkflowID"
RunID = "RunID"
Expand Down Expand Up @@ -368,7 +368,7 @@ func (v *pinotVisibilityStore) ListAllWorkflowExecutions(ctx context.Context, re
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}

query, err := getListAllWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request)
query, err := v.getListAllWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request)
if err != nil {
v.logger.Error(fmt.Sprintf("failed to build list all workflow executions query %v", err))
return nil, err
Expand Down Expand Up @@ -1036,7 +1036,7 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor
return query.String(), nil
}

func getListAllWorkflowExecutionsQuery(tableName string, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (string, error) {
func (v *pinotVisibilityStore) getListAllWorkflowExecutionsQuery(tableName string, request *p.InternalListAllWorkflowExecutionsByTypeRequest) (string, error) {
if request == nil {
return "", nil
}
Expand All @@ -1052,7 +1052,12 @@ func getListAllWorkflowExecutionsQuery(tableName string, request *p.InternalList
query.filters.addTimeRange(StartTime, earliest, latest) // convert Unix Time to miliseconds
}

query.addPinotSorter(StartTime, DescendingOrder)
if v.validSortInput(request.SortColumn, request.SortOrder) {
query.addPinotSorter(request.SortColumn, request.SortOrder)
} else {
// fallback to sorting by StartTime in descending order
query.addPinotSorter(StartTime, DescendingOrder)
}

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
Expand Down Expand Up @@ -1081,6 +1086,13 @@ func getListAllWorkflowExecutionsQuery(tableName string, request *p.InternalList
return query.String(), nil
}

func (v *pinotVisibilityStore) validSortInput(sortColumn, sortOrder string) bool {
validSortColumn := v.pinotQueryValidator.IsValidSearchAttributes(sortColumn)
validSortOrder := sortOrder == DescendingOrder || sortOrder == AscendingOrder

return validSortColumn && validSortOrder
}

func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) (string, error) {
if request == nil {
return "", nil
Expand Down
72 changes: 69 additions & 3 deletions common/persistence/pinot/pinot_visibility_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,7 @@ func TestGetListAllWorkflowExecutionsQuery(t *testing.T) {
expectResult string
expectError error
}{
"complete request with exact match": {
"complete request with exact match and default sorting": {
inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{
InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{
DomainUUID: testDomainID,
Expand All @@ -1784,7 +1784,65 @@ LIMIT 0, 10
`, testTableName),
expectError: nil,
},
"complete request with partial match": {
"complete request with exact match with valid custom sorting": {
inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{
InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{
DomainUUID: testDomainID,
Domain: testDomain,
EarliestTime: time.Unix(0, testEarliestTime),
LatestTime: time.Unix(0, testLatestTime),
PageSize: testPageSize,
NextPageToken: nil,
},
PartialMatch: false,
WorkflowSearchValue: "123",
SortColumn: CloseTime,
SortOrder: AscendingOrder,
},
expectResult: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND StartTime BETWEEN 1547596871371 AND 2547596873371
AND ( WorkflowID = '123'
OR WorkflowType = '123'
OR RunID = '123'
)
Order BY CloseTime ASC
LIMIT 0, 10
`, testTableName),
expectError: nil,
},
"complete request with exact match with invalid custom sorting": {
inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{
InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{
DomainUUID: testDomainID,
Domain: testDomain,
EarliestTime: time.Unix(0, testEarliestTime),
LatestTime: time.Unix(0, testLatestTime),
PageSize: testPageSize,
NextPageToken: nil,
},
PartialMatch: false,
WorkflowSearchValue: "123",
SortColumn: "EndTime",
SortOrder: AscendingOrder,
},
expectResult: fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
AND IsDeleted = false
AND StartTime BETWEEN 1547596871371 AND 2547596873371
AND ( WorkflowID = '123'
OR WorkflowType = '123'
OR RunID = '123'
)
Order BY StartTime DESC
LIMIT 0, 10
`, testTableName),
expectError: nil,
},
"complete request with partial match and default sorting": {
inputRequest: &p.InternalListAllWorkflowExecutionsByTypeRequest{
InternalListWorkflowExecutionsRequest: p.InternalListWorkflowExecutionsRequest{
DomainUUID: testDomainID,
Expand Down Expand Up @@ -1823,9 +1881,17 @@ LIMIT 0, 0
expectError: nil,
},
}
ctrl := gomock.NewController(t)
mockPinotClient := pnt.NewMockGenericClient(ctrl)
mockProducer := &mocks.KafkaProducer{}
mgr := NewPinotVisibilityStore(mockPinotClient, &service.Config{
ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()),
ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3),
}, mockProducer, log.NewNoop())
visibilityStore := mgr.(*pinotVisibilityStore)
for name, test := range tests {
t.Run(name, func(t *testing.T) {
actualResult, actualError := getListAllWorkflowExecutionsQuery(testTableName, test.inputRequest)
actualResult, actualError := visibilityStore.getListAllWorkflowExecutionsQuery(testTableName, test.inputRequest)
assert.Equal(t, test.expectResult, actualResult)
assert.NoError(t, actualError)
})
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/visibility_single_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func (v *visibilityManagerImpl) ListAllWorkflowExecutions(
internalRequest := &InternalListAllWorkflowExecutionsByTypeRequest{
PartialMatch: request.PartialMatch,
WorkflowSearchValue: request.WorkflowSearchValue,
SortColumn: request.SortColumn,
SortOrder: request.SortOrder,
}
if internalListRequest != nil {
internalRequest.InternalListWorkflowExecutionsRequest = *internalListRequest
Expand Down
8 changes: 4 additions & 4 deletions common/pinot/pinotQueryValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (qv *VisibilityQueryValidator) validateRangeExpr(expr sqlparser.Expr) (stri
}
colNameStr := colName.Name.String()

if !qv.isValidSearchAttributes(colNameStr) {
if !qv.IsValidSearchAttributes(colNameStr) {
return "", fmt.Errorf("invalid search attribute %q", colNameStr)
}

Expand Down Expand Up @@ -209,7 +209,7 @@ func (qv *VisibilityQueryValidator) validateComparisonExpr(expr sqlparser.Expr)

colNameStr := colName.Name.String()

if !qv.isValidSearchAttributes(colNameStr) {
if !qv.IsValidSearchAttributes(colNameStr) {
return "", fmt.Errorf("invalid search attribute %q", colNameStr)
}

Expand All @@ -224,8 +224,8 @@ func (qv *VisibilityQueryValidator) validateComparisonExpr(expr sqlparser.Expr)
return qv.processCustomKey(expr)
}

// isValidSearchAttributes return true if key is registered
func (qv *VisibilityQueryValidator) isValidSearchAttributes(key string) bool {
// IsValidSearchAttributes return true if key is registered
func (qv *VisibilityQueryValidator) IsValidSearchAttributes(key string) bool {
validAttr := qv.validSearchAttributes
_, isValidKey := validAttr[key]
return isValidKey
Expand Down
2 changes: 2 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -3488,6 +3488,8 @@ type ListAllWorkflowExecutionsRequest struct {
StartTimeFilter *StartTimeFilter `json:"StartTimeFilter,omitempty"`
PartialMatch bool `json:"partialMatch,omitempty"`
WorkflowSearchValue string `json:"workflowSearchValue,omitempty"`
SortColumn string `json:"sortColumn,omitempty"`
SortOrder string `json:"sortOrder,omitempty"`
}

func (v *ListAllWorkflowExecutionsRequest) SerializeForLogging() (string, error) {
Expand Down
2 changes: 2 additions & 0 deletions service/frontend/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3144,6 +3144,8 @@ func (wh *WorkflowHandler) ListAllWorkflowExecutions(
ListWorkflowExecutionsRequest: baseReq,
PartialMatch: listRequest.PartialMatch,
WorkflowSearchValue: listRequest.WorkflowSearchValue,
SortColumn: listRequest.SortColumn,
SortOrder: listRequest.SortOrder,
},
)
if err != nil {
Expand Down

0 comments on commit 10969a0

Please sign in to comment.