Skip to content

Commit

Permalink
Parse time in visibility query string (cadence-workflow#1790)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored May 7, 2019
1 parent 3298cbf commit 2236052
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 2 deletions.
80 changes: 80 additions & 0 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,22 @@ const (
dslFieldFrom = "from"
dslFieldSize = "size"
dslFieldMust = "must"

defaultDateTimeFormat = time.RFC3339 // used for converting UnixNano to string like 2018-02-15T16:16:36-08:00
)

var (
timeKeys = map[string]bool{
"StartTime": true,
"CloseTime": true,
"ExecutionTime": true,
}
rangeKeys = map[string]bool{
"from": true,
"to": true,
"gt": true,
"lt": true,
}
)

func getESQueryDSLForScan(request *p.ListWorkflowExecutionsRequestV2, token *esVisibilityPageToken) (string, bool, error) {
Expand Down Expand Up @@ -512,6 +528,9 @@ func getCustomizedDSLFromSQL(sql string, domainID string) (*fastjson.Value, bool
addQueryForExecutionTime(dsl)
}
addDomainToQuery(dsl, domainID)
if err := processAllValuesForKey(dsl, timeKeyFilter, timeProcessFunc); err != nil {
return nil, false, err
}
return dsl, isOpen, nil
}

Expand Down Expand Up @@ -811,3 +830,64 @@ func checkPageSize(request *p.ListWorkflowExecutionsRequestV2) {
request.PageSize = 1000
}
}

func processAllValuesForKey(dsl *fastjson.Value, keyFilter func(k string) bool,
processFunc func(obj *fastjson.Object, key string, v *fastjson.Value) error,
) error {
switch dsl.Type() {
case fastjson.TypeArray:
for _, val := range dsl.GetArray() {
if err := processAllValuesForKey(val, keyFilter, processFunc); err != nil {
return err
}
}
case fastjson.TypeObject:
objectVal := dsl.GetObject()
keys := []string{}
objectVal.Visit(func(key []byte, val *fastjson.Value) {
keys = append(keys, string(key))
})

for _, key := range keys {
var err error
val := objectVal.Get(key)
if keyFilter(key) {
err = processFunc(objectVal, key, val)
} else {
err = processAllValuesForKey(val, keyFilter, processFunc)
}
if err != nil {
return err
}
}
default:
// do nothing, since there's no key
}
return nil
}

func timeKeyFilter(key string) bool {
return timeKeys[key]
}

func timeProcessFunc(obj *fastjson.Object, key string, value *fastjson.Value) error {
return processAllValuesForKey(value, func(key string) bool {
return rangeKeys[key]
}, func(obj *fastjson.Object, key string, v *fastjson.Value) error {
timeStr := string(v.GetStringBytes())

// first check if already in int64 format
if _, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
return nil
}

// try to parse time
parsedTime, err := time.Parse(defaultDateTimeFormat, timeStr)
if err != nil {
return err
}

obj.Set(key, fastjson.MustParse(fmt.Sprintf(`"%v"`, parsedTime.UnixNano())))
return nil
})
}
93 changes: 91 additions & 2 deletions common/persistence/elasticsearch/esVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ import (
"strings"
"testing"

"github.com/valyala/fastjson"

"github.com/olivere/elastic"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/valyala/fastjson"

"github.com/uber/cadence/.gen/go/indexer"
workflow "github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -701,6 +700,12 @@ func (s *ESVisibilitySuite) TestGetESQueryDSL() {
s.True(isOpen)
s.Equal(`{"query":{"bool":{"must":[{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}}],"must_not":{"exists":{"field":"CloseTime"}}}},"from":0,"size":10,"sort":[{"CloseTime":"desc"},{"WorkflowID":"desc"}]}`, dsl)

request.Query = `WorkflowID = 'wid' and StartTime > "2018-06-07T15:04:05+00:00"`
dsl, isOpen, err = getESQueryDSL(request, token)
s.Nil(err)
s.False(isOpen)
s.Equal(`{"query":{"bool":{"must":[{"match_phrase":{"WorkflowID":{"query":"wid"}}},{"range":{"StartTime":{"gt":"1528383845000000000"}}},{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}}]}},"from":0,"size":10,"sort":[{"CloseTime":"desc"},{"WorkflowID":"desc"}]}`, dsl)

request.Query = `ExecutionTime < 1000`
dsl, isOpen, err = getESQueryDSL(request, token)
s.Nil(err)
Expand All @@ -713,6 +718,10 @@ func (s *ESVisibilitySuite) TestGetESQueryDSL() {
s.False(isOpen)
s.Equal(`{"query":{"bool":{"should":[{"range":{"ExecutionTime":{"lt":"1000"}}},{"range":{"ExecutionTime":{"gt":"2000"}}}],"must":[{"range":{"ExecutionTime":{"gt":"0"}}},{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}}]}},"from":0,"size":10,"sort":[{"CloseTime":"desc"},{"WorkflowID":"desc"}]}`, dsl)

request.Query = `ExecutionTime < "unable to parse"`
_, _, err = getESQueryDSL(request, token)
s.Error(err)

token = &esVisibilityPageToken{
SortTime: 1,
TieBreaker: "a",
Expand Down Expand Up @@ -743,6 +752,12 @@ func (s *ESVisibilitySuite) TestGetESQueryDSLForScan() {
s.False(isOpen)
s.Equal(`{"query":{"bool":{"must":[{"match_phrase":{"WorkflowID":{"query":"wid"}}},{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}}]}},"from":0,"size":10}`, dsl)

request.Query = `CloseTime = missing and (ExecutionTime >= "2019-08-27T15:04:05+00:00" or StartTime <= "2018-06-07T15:04:05+00:00")`
dsl, isOpen, err = getESQueryDSLForScan(request, token)
s.Nil(err)
s.True(isOpen)
s.Equal(`{"query":{"bool":{"must":[{"bool":{"should":[{"range":{"ExecutionTime":{"from":"1566918245000000000"}}},{"range":{"StartTime":{"to":"1528383845000000000"}}}]}},{"range":{"ExecutionTime":{"gt":"0"}}},{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}}],"must_not":{"exists":{"field":"CloseTime"}}}},"from":0,"size":10}`, dsl)

request.Query = `ExecutionTime < 1000 and ExecutionTime > 500`
dsl, isOpen, err = getESQueryDSLForScan(request, token)
s.Nil(err)
Expand All @@ -765,6 +780,11 @@ func (s *ESVisibilitySuite) TestGetESQueryDSLForCount() {
s.Nil(err)
s.Equal(`{"query":{"bool":{"must":[{"match_phrase":{"WorkflowID":{"query":"wid"}}},{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}}]}}}`, dsl)

request.Query = `CloseTime < "2018-06-07T15:04:05+07:00" and StartTime > "2018-05-04T16:00:00+07:00" and ExecutionTime >= "2018-05-05T16:00:00+07:00"`
dsl, err = getESQueryDSLForCount(request)
s.Nil(err)
s.Equal(`{"query":{"bool":{"must":[{"range":{"CloseTime":{"lt":"1528358645000000000"}}},{"range":{"StartTime":{"gt":"1525424400000000000"}}},{"range":{"ExecutionTime":{"from":"1525510800000000000"}}},{"range":{"ExecutionTime":{"gt":"0"}}},{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}}]}}}`, dsl)

request.Query = `ExecutionTime > 1000`
dsl, err = getESQueryDSLForCount(request)
s.Nil(err)
Expand Down Expand Up @@ -896,3 +916,72 @@ func (s *ESVisibilitySuite) TestCountWorkflowExecutions() {
s.True(ok)
s.True(strings.Contains(err.Error(), "Error when parse query"))
}

func (s *ESVisibilitySuite) TestTimeProcessFunc() {
cases := []struct {
key string
value string
}{
{key: "from", value: "1528358645000000000"},
{key: "to", value: "2018-06-07T15:04:05+07:00"},
{key: "gt", value: "some invalid time string"},
{key: "unrelatedKey", value: "should not be modified"},
}
expected := []struct {
value string
returnErr bool
}{
{value: `"1528358645000000000"`, returnErr: false},
{value: `"1528358645000000000"`},
{value: "", returnErr: true},
{value: `"should not be modified"`, returnErr: false},
}

for i, testCase := range cases {
value := fastjson.MustParse(fmt.Sprintf(`{"%s": "%s"}`, testCase.key, testCase.value))
err := timeProcessFunc(nil, "", value)
if expected[i].returnErr {
s.Error(err)
continue
}
s.Equal(expected[i].value, value.Get(testCase.key).String())
}
}

func (s *ESVisibilitySuite) TestProcessAllValuesForKey() {
testJSONStr := `{
"arrayKey": [
{"testKey1": "value1"},
{"testKey2": "value2"},
{"key3": "value3"}
],
"key4": {
"testKey5": "value5",
"key6": "value6"
},
"testArrayKey": [
{"testKey7": "should not be processed"}
],
"testKey8": "value8"
}`
dsl := fastjson.MustParse(testJSONStr)
testKeyFilter := func(key string) bool {
return strings.HasPrefix(key, "test")
}
processedValue := make(map[string]struct{})
testProcessFunc := func(obj *fastjson.Object, key string, value *fastjson.Value) error {
s.Equal(obj.Get(key), value)
processedValue[value.String()] = struct{}{}
return nil
}
processAllValuesForKey(dsl, testKeyFilter, testProcessFunc)

expectedProcessedValue := map[string]struct{}{
`"value1"`: struct{}{},
`"value2"`: struct{}{},
`"value5"`: struct{}{},
`[{"testKey7":"should not be processed"}]`: struct{}{},
`"value8"`: struct{}{},
}
s.Equal(expectedProcessedValue, processedValue)
}

0 comments on commit 2236052

Please sign in to comment.