Skip to content

Commit

Permalink
Support Listing Scheduled Cron Workflows (cadence-workflow#4176)
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender authored May 7, 2021
1 parent a05ce6b commit fc63ab1
Show file tree
Hide file tree
Showing 46 changed files with 2,430 additions and 2,187 deletions.
52 changes: 48 additions & 4 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/admin/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/api/v1/decision.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/api/v1/history.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/api/v1/query.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/api/v1/service_visibility.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/api/v1/service_worker.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/api/v1/service_workflow.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/api/v1/visibility.pb.yarpc.go

Large diffs are not rendered by default.

328 changes: 185 additions & 143 deletions .gen/proto/api/v1/workflow.pb.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/api/v1/workflow.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/shared/v1/error.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/shared/v1/history.pb.yarpc.go

Large diffs are not rendered by default.

285 changes: 143 additions & 142 deletions .gen/proto/shared/v1/replication.pb.yarpc.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
KafkaKey = "KafkaKey"
BinaryChecksums = "BinaryChecksums"
TaskList = "TaskList"
IsCron = "IsCron"

CustomStringField = "CustomStringField"
CustomKeywordField = "CustomKeywordField"
Expand Down Expand Up @@ -92,6 +93,7 @@ var systemIndexedKeys = map[string]interface{}{
CloseStatus: shared.IndexedValueTypeInt,
HistoryLength: shared.IndexedValueTypeInt,
TaskList: shared.IndexedValueTypeKeyword,
IsCron: shared.IndexedValueTypeBool,
}

// IsSystemIndexedKey return true is key is system added
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ func (c *elasticV6) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
ExecutionTime: time.Unix(0, source.ExecutionTime),
Memo: p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)),
TaskList: source.TaskList,
IsCron: source.IsCron,
SearchAttributes: source.Attr,
}
if source.CloseTime != 0 {
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/client_v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ func (c *elasticV7) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
ExecutionTime: time.Unix(0, source.ExecutionTime),
Memo: p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)),
TaskList: source.TaskList,
IsCron: source.IsCron,
SearchAttributes: source.Attr,
}
if source.CloseTime != 0 {
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
Memo = "Memo"
Encoding = "Encoding"
TaskList = "TaskList"
IsCron = "IsCron"

KafkaKey = "KafkaKey"
)
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ type (
Memo []byte
Encoding string
TaskList string
IsCron bool
Attr map[string]interface{}
}
)
7 changes: 5 additions & 2 deletions common/persistence/elasticsearch/decodeBench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ var (
"StartTime": 1547596872371000000,
"WorkflowID": "6bfbc1e5-6ce4-4e22-bbfb-e0faa9a7a604-1-2256",
"WorkflowType": "TestWorkflowExecute",
"Encoding" : "thriftrw",
"Encoding" : "thriftrw",
"TaskList" : "taskList",
"Memo" : "WQ0ACgsLAAAAAwAAAAJrMgAAAAkidmFuY2V4dSIAAAACazMAAAADMTIzAAAAAmsxAAAAUXsia2V5MSI6MTIzNDMyMSwia2V5MiI6ImEgc3RyaW5nIGlzIHZlcnkgbG9uZyIsIm1hcCI6eyJtS2V5IjoxMjM0MywiYXNkIjoiYXNkZiJ9fQA="}`)
"IsCron" : "false",
"Memo" : "WQ0ACgsLAAAAAwAAAAJrMgAAAAkidmFuY2V4dSIAAAACazMAAAADMTIzAAAAAmsxAAAAUXsia2V5MSI6MTIzNDMyMSwia2V5MiI6ImEgc3RyaW5nIGlzIHZlcnkgbG9uZyIsIm1hcCI6eyJtS2V5IjoxMjM0MywiYXNkIjoiYXNkZiJ9fQA="}`)
)

/*
Expand All @@ -69,6 +70,7 @@ func BenchmarkJSONDecodeToType(b *testing.B) {
ExecutionTime: time.Unix(0, source.ExecutionTime),
Memo: p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)),
TaskList: source.TaskList,
IsCron: source.IsCron,
}
record.CloseTime = time.Unix(0, source.CloseTime)
record.Status = thrift.ToWorkflowExecutionCloseStatus(&source.CloseStatus)
Expand Down Expand Up @@ -97,6 +99,7 @@ func BenchmarkJSONDecodeToMap(b *testing.B) {
StartTime: time.Unix(0, startTime),
ExecutionTime: time.Unix(0, executionTime),
TaskList: source[definition.TaskList].(string),
IsCron: source[definition.IsCron].(bool),
Memo: p.NewDataBlob([]byte(source[definition.Memo].(string)), common.EncodingType(source[definition.Encoding].(string))),
}
record.CloseTime = time.Unix(0, closeTime)
Expand Down
43 changes: 36 additions & 7 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (v *esVisibilityStore) RecordWorkflowExecutionStarted(
request.TaskID,
request.Memo.Data,
request.Memo.GetEncoding(),
request.IsCron,
request.SearchAttributes,
)
return v.producer.Publish(ctx, msg)
Expand All @@ -127,6 +128,7 @@ func (v *esVisibilityStore) RecordWorkflowExecutionClosed(
request.Memo.Data,
request.TaskList,
request.Memo.GetEncoding(),
request.IsCron,
request.SearchAttributes,
)
return v.producer.Publish(ctx, msg)
Expand All @@ -148,6 +150,7 @@ func (v *esVisibilityStore) UpsertWorkflowExecution(
request.TaskID,
request.Memo.Data,
request.Memo.GetEncoding(),
request.IsCron,
request.SearchAttributes,
)
return v.producer.Publish(ctx, msg)
Expand Down Expand Up @@ -700,16 +703,28 @@ func (v *esVisibilityStore) checkProducer() {
}
}

func getVisibilityMessage(domainID string, wid, rid string, workflowTypeName string, taskList string,
startTimeUnixNano, executionTimeUnixNano int64, taskID int64, memo []byte, encoding common.EncodingType,
searchAttributes map[string][]byte) *indexer.Message {
func getVisibilityMessage(
domainID string,
wid,
rid string,
workflowTypeName string,
taskList string,
startTimeUnixNano,
executionTimeUnixNano int64,
taskID int64,
memo []byte,
encoding common.EncodingType,
isCron bool,
searchAttributes map[string][]byte,
) *indexer.Message {

msgType := indexer.MessageTypeIndex
fields := map[string]*indexer.Field{
es.WorkflowType: {Type: &es.FieldTypeString, StringData: common.StringPtr(workflowTypeName)},
es.StartTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(startTimeUnixNano)},
es.ExecutionTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(executionTimeUnixNano)},
es.TaskList: {Type: &es.FieldTypeString, StringData: common.StringPtr(taskList)},
es.IsCron: {Type: &es.FieldTypeBool, BoolData: common.BoolPtr(isCron)},
}
if len(memo) != 0 {
fields[es.Memo] = &indexer.Field{Type: &es.FieldTypeBinary, BinaryData: memo}
Expand All @@ -730,10 +745,23 @@ func getVisibilityMessage(domainID string, wid, rid string, workflowTypeName str
return msg
}

func getVisibilityMessageForCloseExecution(domainID string, wid, rid string, workflowTypeName string,
startTimeUnixNano int64, executionTimeUnixNano int64, endTimeUnixNano int64, closeStatus workflow.WorkflowExecutionCloseStatus,
historyLength int64, taskID int64, memo []byte, taskList string, encoding common.EncodingType,
searchAttributes map[string][]byte) *indexer.Message {
func getVisibilityMessageForCloseExecution(
domainID string,
wid,
rid string,
workflowTypeName string,
startTimeUnixNano int64,
executionTimeUnixNano int64,
endTimeUnixNano int64,
closeStatus workflow.WorkflowExecutionCloseStatus,
historyLength int64,
taskID int64,
memo []byte,
taskList string,
encoding common.EncodingType,
isCron bool,
searchAttributes map[string][]byte,
) *indexer.Message {

msgType := indexer.MessageTypeIndex
fields := map[string]*indexer.Field{
Expand All @@ -744,6 +772,7 @@ func getVisibilityMessageForCloseExecution(domainID string, wid, rid string, wor
es.CloseStatus: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(int64(closeStatus))},
es.HistoryLength: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(historyLength)},
es.TaskList: {Type: &es.FieldTypeString, StringData: common.StringPtr(taskList)},
es.IsCron: {Type: &es.FieldTypeBool, BoolData: common.BoolPtr(isCron)},
}
if len(memo) != 0 {
fields[es.Memo] = &indexer.Field{Type: &es.FieldTypeBinary, BinaryData: memo}
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/elasticsearch/esVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionStarted() {
request.StartTimestamp = time.Unix(0, int64(123))
request.ExecutionTimestamp = time.Unix(0, int64(321))
request.TaskID = int64(111)
request.IsCron = true
memoBytes := []byte(`test bytes`)
request.Memo = p.NewDataBlob(memoBytes, common.EncodingTypeThriftRW)

Expand All @@ -141,6 +142,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionStarted() {
s.Equal(request.ExecutionTimestamp.UnixNano(), fields[es.ExecutionTime].GetIntData())
s.Equal(memoBytes, fields[es.Memo].GetBinaryData())
s.Equal(string(common.EncodingTypeThriftRW), fields[es.Encoding].GetStringData())
s.Equal(request.IsCron, fields[es.IsCron].GetBoolData())
return true
})).Return(nil).Once()

Expand Down Expand Up @@ -188,6 +190,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() {
closeStatus := workflow.WorkflowExecutionCloseStatusTerminated
request.Status = *thrift.ToWorkflowExecutionCloseStatus(&closeStatus)
request.HistoryLength = int64(20)
request.IsCron = false
s.mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.Message) bool {
fields := input.Fields
s.Equal(request.DomainUUID, input.GetDomainID())
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ type (
HistoryLength int64
Memo *DataBlob
TaskList string
IsCron bool
SearchAttributes map[string]interface{}
}

Expand Down Expand Up @@ -691,6 +692,7 @@ type (
TaskID int64
Memo *DataBlob
TaskList string
IsCron bool
SearchAttributes map[string][]byte
}

Expand All @@ -710,6 +712,7 @@ type (
Status types.WorkflowExecutionCloseStatus
HistoryLength int64
RetentionSeconds time.Duration
IsCron bool
}

// InternalUpsertWorkflowExecutionRequest is request to UpsertWorkflowExecution
Expand All @@ -724,6 +727,7 @@ type (
TaskID int64
Memo *DataBlob
TaskList string
IsCron bool
SearchAttributes map[string][]byte
}

Expand Down
3 changes: 3 additions & 0 deletions common/persistence/visibilityInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type (
TaskID int64 // not persisted, used as condition update version for ES
Memo *types.Memo
TaskList string
IsCron bool
SearchAttributes map[string][]byte
}

Expand All @@ -66,6 +67,7 @@ type (
TaskID int64 // not persisted, used as condition update version for ES
Memo *types.Memo
TaskList string
IsCron bool
SearchAttributes map[string][]byte
}

Expand All @@ -81,6 +83,7 @@ type (
TaskID int64 // not persisted, used as condition update version for ES
Memo *types.Memo
TaskList string
IsCron bool
SearchAttributes map[string][]byte
}

Expand Down
4 changes: 4 additions & 0 deletions common/persistence/visibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionStarted(
WorkflowTimeout: common.SecondsToDuration(request.WorkflowTimeout),
TaskID: request.TaskID,
TaskList: request.TaskList,
IsCron: request.IsCron,
Memo: v.serializeMemo(request.Memo, request.DomainUUID, request.Execution.GetWorkflowID(), request.Execution.GetRunID()),
SearchAttributes: request.SearchAttributes,
}
Expand All @@ -100,6 +101,7 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionClosed(
Status: request.Status,
HistoryLength: request.HistoryLength,
RetentionSeconds: common.SecondsToDuration(request.RetentionSeconds),
IsCron: request.IsCron,
}
return v.persistence.RecordWorkflowExecutionClosed(ctx, req)
}
Expand All @@ -118,6 +120,7 @@ func (v *visibilityManagerImpl) UpsertWorkflowExecution(
TaskID: request.TaskID,
Memo: v.serializeMemo(request.Memo, request.DomainUUID, request.Execution.GetWorkflowID(), request.Execution.GetRunID()),
TaskList: request.TaskList,
IsCron: request.IsCron,
SearchAttributes: request.SearchAttributes,
}
return v.persistence.UpsertWorkflowExecution(ctx, req)
Expand Down Expand Up @@ -366,6 +369,7 @@ func (v *visibilityManagerImpl) convertVisibilityWorkflowExecutionInfo(execution
Memo: memo,
SearchAttributes: searchAttributes,
TaskList: execution.TaskList,
IsCron: execution.IsCron,
}

// for close records
Expand Down
2 changes: 2 additions & 0 deletions common/types/mapper/thrift/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6039,6 +6039,7 @@ func FromWorkflowExecutionInfo(t *types.WorkflowExecutionInfo) *shared.WorkflowE
SearchAttributes: FromSearchAttributes(t.SearchAttributes),
AutoResetPoints: FromResetPoints(t.AutoResetPoints),
TaskList: &t.TaskList,
IsCron: &t.IsCron,
}
}

Expand All @@ -6061,6 +6062,7 @@ func ToWorkflowExecutionInfo(t *shared.WorkflowExecutionInfo) *types.WorkflowExe
SearchAttributes: ToSearchAttributes(t.SearchAttributes),
AutoResetPoints: ToResetPoints(t.AutoResetPoints),
TaskList: t.GetTaskList(),
IsCron: t.GetIsCron(),
}
}

Expand Down
9 changes: 9 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -9520,6 +9520,7 @@ type WorkflowExecutionInfo struct {
SearchAttributes *SearchAttributes `json:"searchAttributes,omitempty"`
AutoResetPoints *ResetPoints `json:"autoResetPoints,omitempty"`
TaskList string `json:"taskList,omitempty"`
IsCron bool `json:"isCron,omitempty"`
}

// GetExecution is an internal getter (TBD...)
Expand Down Expand Up @@ -9626,6 +9627,14 @@ func (v *WorkflowExecutionInfo) GetTaskList() (o string) {
return
}

// GetIsCron is an internal getter (TBD...)
func (v *WorkflowExecutionInfo) GetIsCron() (o bool) {
if v != nil {
return v.IsCron
}
return
}

// WorkflowExecutionSignaledEventAttributes is an internal type (TBD...)
type WorkflowExecutionSignaledEventAttributes struct {
SignalName string `json:"signalName,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions config/dynamicconfig/development_es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ frontend.validSearchAttributes:
CloseStatus: 2
HistoryLength: 2
TaskList: 1
IsCron: 1
CustomStringField: 0
CustomKeywordField: 1
CustomIntField: 2
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ require (
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
go.opencensus.io v0.22.5 // indirect
go.uber.org/atomic v1.5.1
go.uber.org/cadence v0.17.0
go.uber.org/atomic v1.7.0
go.uber.org/cadence v0.17.1-0.20210506170202-97f887f049c4
go.uber.org/fx v1.10.0
go.uber.org/multierr v1.4.0
go.uber.org/multierr v1.6.0
go.uber.org/thriftrw v1.25.0
go.uber.org/yarpc v1.52.0
go.uber.org/yarpc v1.53.2
go.uber.org/zap v1.13.0
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
Expand Down
Loading

0 comments on commit fc63ab1

Please sign in to comment.