Skip to content

Commit

Permalink
Support filtering global domains in ElasticSearch (cadence-workflow#4539
Browse files Browse the repository at this point in the history
)
  • Loading branch information
demirkayaender authored Oct 15, 2021
1 parent f8f95d5 commit 87b2eae
Show file tree
Hide file tree
Showing 46 changed files with 284 additions and 53 deletions.
92 changes: 76 additions & 16 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion common/config/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (m *ClusterGroupMetadata) Validate() error {
if m == nil {
return errors.New("ClusterGroupMetadata cannot be empty")
}

if !m.EnableGlobalDomain {
log.Println("[WARN] Local domain is now deprecated. Please update config to enable global domain(ClusterGroupMetadata->EnableGlobalDomain)." +
"Global domain of single cluster has zero overhead, but only advantages for future migration and fail over. Please check Cadence documentation for more details.")
Expand Down
6 changes: 3 additions & 3 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ type (
// 1) "noop" will not do any forwarding.
//
// 2) "all-domain-apis-forwarding" will forward all domain specific APIs(worker and non worker) if the current active domain is
// the same as "allDomainApisForwardingTargetCluster"( or "allDomainApisForwardingTargetCluster" is empty), otherwise it fallbacks to "selected-apis-forwarding".
// the same as "allDomainApisForwardingTargetCluster"( or "allDomainApisForwardingTargetCluster" is empty), otherwise it fallbacks to "selected-apis-forwarding".
//
// 3) "selected-apis-forwarding" will forward all non-worker APIs including
// 1. StartWorkflowExecution
Expand All @@ -361,10 +361,10 @@ type (
//
// 1) If the network communication overhead is high(e.g., clusters are in remote datacenters of different region), then should use "selected-apis-forwarding".
// But you must ensure a different set of workers with the same workflow & activity code are connected to each Cadence cluster.
//
//
// 2) If the network communication overhead is low (e.g. in the same datacenter, mostly for cluster migration usage), then you can use "all-domain-apis-forwarding". Then only one set of
// workflow & activity worker connected of one of the Cadence cluster is enough as all domain APIs are forwarded. See more details in documentation of cluster migration section.
// Usually "allDomainApisForwardingTargetCluster" should be empty(default value) except for very rare cases: you have more than two clusters and some are in a remote region but some are in local region.
// Usually "allDomainApisForwardingTargetCluster" should be empty(default value) except for very rare cases: you have more than two clusters and some are in a remote region but some are in local region.
Policy string `yaml:"policy"`
// A supplement for "all-domain-apis-forwarding" policy. It decides how the policy fallback to "selected-apis-forwarding" policy.
// If this is not empty, and current domain is not active in the value of allDomainApisForwardingTargetCluster, then the policy will fallback to "selected-apis-forwarding" policy.
Expand Down
2 changes: 2 additions & 0 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
BinaryChecksums = "BinaryChecksums"
TaskList = "TaskList"
IsCron = "IsCron"
NumClusters = "NumClusters"

CustomStringField = "CustomStringField"
CustomKeywordField = "CustomKeywordField"
Expand Down Expand Up @@ -94,6 +95,7 @@ var systemIndexedKeys = map[string]interface{}{
HistoryLength: shared.IndexedValueTypeInt,
TaskList: shared.IndexedValueTypeKeyword,
IsCron: shared.IndexedValueTypeBool,
NumClusters: shared.IndexedValueTypeInt,
}

// IsSystemIndexedKey return true is key is system added
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ func (c *elasticV6) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
Memo: p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)),
TaskList: source.TaskList,
IsCron: source.IsCron,
NumClusters: source.NumClusters,
SearchAttributes: source.Attr,
}
if source.CloseTime != 0 {
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/client_v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ func (c *elasticV7) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
Memo: p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)),
TaskList: source.TaskList,
IsCron: source.IsCron,
NumClusters: source.NumClusters,
SearchAttributes: source.Attr,
}
if source.CloseTime != 0 {
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
Encoding = "Encoding"
TaskList = "TaskList"
IsCron = "IsCron"
NumClusters = "NumClusters"

KafkaKey = "KafkaKey"
)
Expand Down
1 change: 1 addition & 0 deletions common/elasticsearch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ type (
Encoding string
TaskList string
IsCron bool
NumClusters int16
Attr map[string]interface{}
}
)
4 changes: 4 additions & 0 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ type (
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
SearchAttributes map[string]interface{}
}

Expand Down Expand Up @@ -693,6 +694,7 @@ type (
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
SearchAttributes map[string][]byte
}

Expand All @@ -713,6 +715,7 @@ type (
HistoryLength int64
RetentionSeconds time.Duration
IsCron bool
NumClusters int16
}

// InternalUpsertWorkflowExecutionRequest is request to UpsertWorkflowExecution
Expand All @@ -728,6 +731,7 @@ type (
Memo *DataBlob
TaskList string
IsCron bool
NumClusters int16
SearchAttributes map[string][]byte
}

Expand Down
3 changes: 3 additions & 0 deletions common/persistence/dataVisibilityManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type (
Memo *types.Memo
TaskList string
IsCron bool
NumClusters int16
SearchAttributes map[string][]byte
}

Expand All @@ -68,6 +69,7 @@ type (
Memo *types.Memo
TaskList string
IsCron bool
NumClusters int16
SearchAttributes map[string][]byte
}

Expand All @@ -84,6 +86,7 @@ type (
Memo *types.Memo
TaskList string
IsCron bool
NumClusters int16
SearchAttributes map[string][]byte
}

Expand Down
3 changes: 3 additions & 0 deletions common/persistence/elasticsearch/decodeBench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
"Encoding" : "thriftrw",
"TaskList" : "taskList",
"IsCron" : "false",
"NumClusters" : "2",
"Memo" : "WQ0ACgsLAAAAAwAAAAJrMgAAAAkidmFuY2V4dSIAAAACazMAAAADMTIzAAAAAmsxAAAAUXsia2V5MSI6MTIzNDMyMSwia2V5MiI6ImEgc3RyaW5nIGlzIHZlcnkgbG9uZyIsIm1hcCI6eyJtS2V5IjoxMjM0MywiYXNkIjoiYXNkZiJ9fQA="}`)
)

Expand All @@ -71,6 +72,7 @@ func BenchmarkJSONDecodeToType(b *testing.B) {
Memo: p.NewDataBlob(source.Memo, common.EncodingType(source.Encoding)),
TaskList: source.TaskList,
IsCron: source.IsCron,
NumClusters: source.NumClusters,
}
record.CloseTime = time.Unix(0, source.CloseTime)
record.Status = thrift.ToWorkflowExecutionCloseStatus(&source.CloseStatus)
Expand Down Expand Up @@ -100,6 +102,7 @@ func BenchmarkJSONDecodeToMap(b *testing.B) {
ExecutionTime: time.Unix(0, executionTime),
TaskList: source[definition.TaskList].(string),
IsCron: source[definition.IsCron].(bool),
NumClusters: source[definition.NumClusters].(int16),
Memo: p.NewDataBlob([]byte(source[definition.Memo].(string)), common.EncodingType(source[definition.Encoding].(string))),
}
record.CloseTime = time.Unix(0, closeTime)
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (v *esVisibilityStore) RecordWorkflowExecutionStarted(
request.Memo.Data,
request.Memo.GetEncoding(),
request.IsCron,
request.NumClusters,
request.SearchAttributes,
)
return v.producer.Publish(ctx, msg)
Expand All @@ -129,6 +130,7 @@ func (v *esVisibilityStore) RecordWorkflowExecutionClosed(
request.TaskList,
request.Memo.GetEncoding(),
request.IsCron,
request.NumClusters,
request.SearchAttributes,
)
return v.producer.Publish(ctx, msg)
Expand All @@ -151,6 +153,7 @@ func (v *esVisibilityStore) UpsertWorkflowExecution(
request.Memo.Data,
request.Memo.GetEncoding(),
request.IsCron,
request.NumClusters,
request.SearchAttributes,
)
return v.producer.Publish(ctx, msg)
Expand Down Expand Up @@ -715,6 +718,7 @@ func getVisibilityMessage(
memo []byte,
encoding common.EncodingType,
isCron bool,
NumClusters int16,
searchAttributes map[string][]byte,
) *indexer.Message {

Expand All @@ -725,6 +729,7 @@ func getVisibilityMessage(
es.ExecutionTime: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(executionTimeUnixNano)},
es.TaskList: {Type: &es.FieldTypeString, StringData: common.StringPtr(taskList)},
es.IsCron: {Type: &es.FieldTypeBool, BoolData: common.BoolPtr(isCron)},
es.NumClusters: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(int64(NumClusters))},
}
if len(memo) != 0 {
fields[es.Memo] = &indexer.Field{Type: &es.FieldTypeBinary, BinaryData: memo}
Expand Down Expand Up @@ -760,6 +765,7 @@ func getVisibilityMessageForCloseExecution(
taskList string,
encoding common.EncodingType,
isCron bool,
NumClusters int16,
searchAttributes map[string][]byte,
) *indexer.Message {

Expand All @@ -773,6 +779,7 @@ func getVisibilityMessageForCloseExecution(
es.HistoryLength: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(historyLength)},
es.TaskList: {Type: &es.FieldTypeString, StringData: common.StringPtr(taskList)},
es.IsCron: {Type: &es.FieldTypeBool, BoolData: common.BoolPtr(isCron)},
es.NumClusters: {Type: &es.FieldTypeInt, IntData: common.Int64Ptr(int64(NumClusters))},
}
if len(memo) != 0 {
fields[es.Memo] = &indexer.Field{Type: &es.FieldTypeBinary, BinaryData: memo}
Expand Down
5 changes: 5 additions & 0 deletions common/persistence/elasticsearch/esVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionStarted() {
request.ExecutionTimestamp = time.Unix(0, int64(321))
request.TaskID = int64(111)
request.IsCron = true
request.NumClusters = 2
memoBytes := []byte(`test bytes`)
request.Memo = p.NewDataBlob(memoBytes, common.EncodingTypeThriftRW)

Expand All @@ -143,6 +144,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionStarted() {
s.Equal(memoBytes, fields[es.Memo].GetBinaryData())
s.Equal(string(common.EncodingTypeThriftRW), fields[es.Encoding].GetStringData())
s.Equal(request.IsCron, fields[es.IsCron].GetBoolData())
s.Equal((int64)(request.NumClusters), fields[es.NumClusters].GetIntData())
return true
})).Return(nil).Once()

Expand Down Expand Up @@ -191,6 +193,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() {
request.Status = *thrift.ToWorkflowExecutionCloseStatus(&closeStatus)
request.HistoryLength = int64(20)
request.IsCron = false
request.NumClusters = 2
s.mockProducer.On("Publish", mock.Anything, mock.MatchedBy(func(input *indexer.Message) bool {
fields := input.Fields
s.Equal(request.DomainUUID, input.GetDomainID())
Expand All @@ -205,6 +208,8 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed() {
s.Equal(request.CloseTimestamp.UnixNano(), fields[es.CloseTime].GetIntData())
s.Equal(int64(closeStatus), fields[es.CloseStatus].GetIntData())
s.Equal(request.HistoryLength, fields[es.HistoryLength].GetIntData())
s.Equal(request.IsCron, fields[es.IsCron].GetBoolData())
s.Equal((int64)(request.NumClusters), fields[es.NumClusters].GetIntData())
return true
})).Return(nil).Once()

Expand Down
2 changes: 2 additions & 0 deletions common/persistence/nosql/nosqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (v *nosqlVisibilityStore) RecordWorkflowExecutionStarted(
Memo: request.Memo,
TaskList: request.TaskList,
IsCron: request.IsCron,
NumClusters: request.NumClusters,
},
})
if err != nil {
Expand Down Expand Up @@ -113,6 +114,7 @@ func (v *nosqlVisibilityStore) RecordWorkflowExecutionClosed(
Memo: request.Memo,
TaskList: request.TaskList,
IsCron: request.IsCron,
NumClusters: request.NumClusters,
//closed workflow attributes
Status: &request.Status,
CloseTime: request.CloseTimestamp,
Expand Down
30 changes: 20 additions & 10 deletions common/persistence/nosql/nosqlplugin/cassandra/visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ const (

const (
///////////////// Open Executions /////////////////
openExecutionsColumnsForSelect = " workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_list, is_cron "
openExecutionsColumnsForSelect = " workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_list, is_cron, num_clusters "

openExecutionsColumnsForInsert = "(domain_id, domain_partition, " + openExecutionsColumnsForSelect + ")"

templateCreateWorkflowExecutionStartedWithTTL = `INSERT INTO open_executions ` +
openExecutionsColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions` +
openExecutionsColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateDeleteWorkflowExecutionStarted = `DELETE FROM open_executions ` +
`WHERE domain_id = ? ` +
Expand Down Expand Up @@ -81,25 +81,25 @@ const (
`AND workflow_id = ? `

///////////////// Closed Executions /////////////////
closedExecutionColumnsForSelect = " workflow_id, run_id, start_time, execution_time, close_time, workflow_type_name, status, history_length, memo, encoding, task_list, is_cron "
closedExecutionColumnsForSelect = " workflow_id, run_id, start_time, execution_time, close_time, workflow_type_name, status, history_length, memo, encoding, task_list, is_cron, num_clusters "

closedExecutionColumnsForInsert = "(domain_id, domain_partition, " + closedExecutionColumnsForSelect + ")"

templateCreateWorkflowExecutionClosedWithTTL = `INSERT INTO closed_executions ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateCreateWorkflowExecutionClosedWithTTLV2 = `INSERT INTO closed_executions_v2 ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`

templateCreateWorkflowExecutionClosedV2 = `INSERT INTO closed_executions_v2 ` +
closedExecutionColumnsForInsert +
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`

templateGetClosedWorkflowExecutions = `SELECT ` + closedExecutionColumnsForSelect +
`FROM closed_executions ` +
Expand Down Expand Up @@ -188,6 +188,7 @@ func (db *cdb) InsertVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.Memo.GetEncoding(),
row.TaskList,
row.IsCron,
row.NumClusters,
).WithContext(ctx)
} else {
query = db.session.Query(templateCreateWorkflowExecutionStartedWithTTL,
Expand All @@ -202,6 +203,7 @@ func (db *cdb) InsertVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.Memo.GetEncoding(),
row.TaskList,
row.IsCron,
row.NumClusters,
ttlSeconds,
).WithContext(ctx)
}
Expand Down Expand Up @@ -244,6 +246,7 @@ func (db *cdb) UpdateVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.Memo.GetEncoding(),
row.TaskList,
row.IsCron,
row.NumClusters,
)
// duplicate write to v2 to order by close time
batch.Query(templateCreateWorkflowExecutionClosedV2,
Expand All @@ -261,6 +264,7 @@ func (db *cdb) UpdateVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.Memo.GetEncoding(),
row.TaskList,
row.IsCron,
row.NumClusters,
)
} else {
batch.Query(templateCreateWorkflowExecutionClosedWithTTL,
Expand All @@ -278,6 +282,7 @@ func (db *cdb) UpdateVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.Memo.GetEncoding(),
row.TaskList,
row.IsCron,
row.NumClusters,
ttlSeconds,
)
// duplicate write to v2 to order by close time
Expand All @@ -296,6 +301,7 @@ func (db *cdb) UpdateVisibility(ctx context.Context, ttlSeconds int64, row *nosq
row.Memo.GetEncoding(),
row.TaskList,
row.IsCron,
row.NumClusters,
ttlSeconds,
)
}
Expand Down Expand Up @@ -606,7 +612,8 @@ func readOpenWorkflowExecutionRecord(
var encoding string
var taskList string
var isCron bool
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &typeName, &memo, &encoding, &taskList, &isCron) {
var numClusters int16
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &typeName, &memo, &encoding, &taskList, &isCron, &numClusters) {
record := &persistence.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: workflowID,
RunID: runID,
Expand All @@ -616,6 +623,7 @@ func readOpenWorkflowExecutionRecord(
Memo: persistence.NewDataBlob(memo, common.EncodingType(encoding)),
TaskList: taskList,
IsCron: isCron,
NumClusters: numClusters,
}
return record, true
}
Expand All @@ -637,7 +645,8 @@ func readClosedWorkflowExecutionRecord(
var encoding string
var taskList string
var isCron bool
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &closeTime, &typeName, &status, &historyLength, &memo, &encoding, &taskList, &isCron) {
var numClusters int16
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &closeTime, &typeName, &status, &historyLength, &memo, &encoding, &taskList, &isCron, &numClusters) {
record := &persistence.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: workflowID,
RunID: runID,
Expand All @@ -650,6 +659,7 @@ func readClosedWorkflowExecutionRecord(
Memo: persistence.NewDataBlob(memo, common.EncodingType(encoding)),
TaskList: taskList,
IsCron: isCron,
NumClusters: numClusters,
}
return record, true
}
Expand Down
Loading

0 comments on commit 87b2eae

Please sign in to comment.