Skip to content

Commit

Permalink
Adding auto-reset points to visibility record (cadence-workflow#2830)
Browse files Browse the repository at this point in the history
* Adding auto-reset points to visibility record

* avoid initializing map in the constructor
  • Loading branch information
Vitaly authored Nov 19, 2019
1 parent 5032b19 commit 88c6d1a
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 26 deletions.
24 changes: 13 additions & 11 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ import "github.com/uber/cadence/.gen/go/shared"

// valid indexed fields on ES
const (
DomainID = "DomainID"
WorkflowID = "WorkflowID"
RunID = "RunID"
WorkflowType = "WorkflowType"
StartTime = "StartTime"
ExecutionTime = "ExecutionTime"
CloseTime = "CloseTime"
CloseStatus = "CloseStatus"
HistoryLength = "HistoryLength"
Encoding = "Encoding"
KafkaKey = "KafkaKey"
DomainID = "DomainID"
WorkflowID = "WorkflowID"
RunID = "RunID"
WorkflowType = "WorkflowType"
StartTime = "StartTime"
ExecutionTime = "ExecutionTime"
CloseTime = "CloseTime"
CloseStatus = "CloseStatus"
HistoryLength = "HistoryLength"
Encoding = "Encoding"
KafkaKey = "KafkaKey"
BinaryChecksums = "BinaryChecksums"

CustomStringField = "CustomStringField"
CustomKeywordField = "CustomKeywordField"
Expand Down Expand Up @@ -65,6 +66,7 @@ func createDefaultIndexedKeys() map[string]interface{} {
CustomDoubleField: shared.IndexedValueTypeDouble,
CustomDatetimeField: shared.IndexedValueTypeDatetime,
CadenceChangeVersion: shared.IndexedValueTypeKeyword,
BinaryChecksums: shared.IndexedValueTypeKeyword,
}
for k, v := range systemIndexedKeys {
defaultIndexedKeys[k] = v
Expand Down
1 change: 1 addition & 0 deletions config/dynamicconfig/development_es.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ frontend.validSearchAttributes:
Operator: 1
RolloutID: 1
CadenceChangeVersion: 1
BinaryChecksums: 1
system.minRetentionDays:
- value: 0
18 changes: 14 additions & 4 deletions host/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_SearchAttribute() {
listRequest := &workflow.ListWorkflowExecutionsRequest{
Domain: common.StringPtr(s.domainName),
PageSize: common.Int32Ptr(int32(2)),
Query: common.StringPtr(fmt.Sprintf(`WorkflowType = '%s' and CloseTime = missing`, wt)),
Query: common.StringPtr(fmt.Sprintf(`WorkflowType = '%s' and CloseTime = missing and BinaryChecksums = 'binary-v1'`, wt)),
}
// verify upsert data is on ES
s.testListResultForUpsertSearchAttributes(listRequest)
Expand Down Expand Up @@ -970,18 +970,26 @@ func (s *elasticsearchIntegrationSuite) testListResultForUpsertSearchAttributes(
if len(resp.GetExecutions()) == 1 {
execution := resp.GetExecutions()[0]
retrievedSearchAttr := execution.SearchAttributes
if retrievedSearchAttr != nil && len(retrievedSearchAttr.GetIndexedFields()) == 2 {
if retrievedSearchAttr != nil && len(retrievedSearchAttr.GetIndexedFields()) == 3 {
fields := retrievedSearchAttr.GetIndexedFields()
searchValBytes := fields[s.testSearchAttributeKey]
var searchVal string
json.Unmarshal(searchValBytes, &searchVal)
err := json.Unmarshal(searchValBytes, &searchVal)
s.Nil(err)
s.Equal("another string", searchVal)

searchValBytes2 := fields[definition.CustomIntField]
var searchVal2 int
json.Unmarshal(searchValBytes2, &searchVal2)
err = json.Unmarshal(searchValBytes2, &searchVal2)
s.Nil(err)
s.Equal(123, searchVal2)

binaryChecksumsBytes := fields[definition.BinaryChecksums]
var binaryChecksums []string
err = json.Unmarshal(binaryChecksumsBytes, &binaryChecksums)
s.Nil(err)
s.Equal([]string{"binary-v1", "binary-v2"}, binaryChecksums)

verified = true
break
}
Expand All @@ -994,10 +1002,12 @@ func (s *elasticsearchIntegrationSuite) testListResultForUpsertSearchAttributes(
func getUpsertSearchAttributes() *workflow.SearchAttributes {
attrValBytes1, _ := json.Marshal("another string")
attrValBytes2, _ := json.Marshal(123)
binaryChecksums, _ := json.Marshal([]string{"binary-v1", "binary-v2"})
upsertSearchAttr := &workflow.SearchAttributes{
IndexedFields: map[string][]byte{
definition.CustomStringField: attrValBytes1,
definition.CustomIntField: attrValBytes2,
definition.BinaryChecksums: binaryChecksums,
},
}
return upsertSearchAttr
Expand Down
13 changes: 12 additions & 1 deletion host/testdata/es_index_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,23 @@
},
"Attr": {
"properties": {
"CadenceChangeVersion": { "type": "keyword" },
"CustomStringField": { "type": "text" },
"CustomKeywordField": { "type": "keyword"},
"CustomIntField": { "type": "long"},
"CustomBoolField": { "type": "boolean"},
"CustomDoubleField": { "type": "double"},
"CustomDatetimeField": { "type": "date"}
"CustomDatetimeField": { "type": "date"},
"project": { "type": "keyword"},
"service": { "type": "keyword"},
"environment": { "type": "keyword"},
"addon": { "type": "keyword"},
"addon-type": { "type": "keyword"},
"user": { "type": "keyword"},
"CustomDomain": { "type": "keyword"},
"Operator": { "type": "keyword"},
"RolloutID": { "type": "keyword"},
"BinaryChecksums": { "type": "keyword"}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion schema/elasticsearch/visibility/index_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
"user": { "type": "keyword"},
"CustomDomain": { "type": "keyword"},
"Operator": { "type": "keyword"},
"RolloutID": { "type": "keyword"}
"RolloutID": { "type": "keyword"},
"BinaryChecksums": { "type": "keyword"}
}
}
}
Expand Down
29 changes: 25 additions & 4 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package history

import (
"encoding/json"
"fmt"
"time"

Expand All @@ -33,6 +34,7 @@ import (
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -1880,10 +1882,10 @@ func (e *mutableStateBuilder) CreateTransientDecisionEvents(
func (e *mutableStateBuilder) addBinaryCheckSumIfNotExists(
event *workflow.HistoryEvent,
maxResetPoints int,
) {
) error {
binChecksum := event.GetDecisionTaskCompletedEventAttributes().GetBinaryChecksum()
if len(binChecksum) == 0 {
return
return nil
}
exeInfo := e.executionInfo
var currResetPoints []*workflow.ResetPointInfo
Expand All @@ -1893,17 +1895,24 @@ func (e *mutableStateBuilder) addBinaryCheckSumIfNotExists(
currResetPoints = make([]*workflow.ResetPointInfo, 0, 1)
}

// List of all recent binary checksums associated with the workflow.
var recentBinaryChecksums []string

for _, rp := range currResetPoints {
recentBinaryChecksums = append(recentBinaryChecksums, rp.GetBinaryChecksum())
if rp.GetBinaryChecksum() == binChecksum {
// this checksum already exists
return
return nil
}
}

if len(currResetPoints) == maxResetPoints {
// if exceeding the max limit, do rotation by taking the oldest one out
// If exceeding the max limit, do rotation by taking the oldest one out.
currResetPoints = currResetPoints[1:]
recentBinaryChecksums = recentBinaryChecksums[1:]
}
// Adding current version of the binary checksum.
recentBinaryChecksums = append(recentBinaryChecksums, binChecksum)

resettable := true
err := e.CheckResettable()
Expand All @@ -1921,6 +1930,18 @@ func (e *mutableStateBuilder) addBinaryCheckSumIfNotExists(
exeInfo.AutoResetPoints = &workflow.ResetPoints{
Points: currResetPoints,
}
bytes, err := json.Marshal(recentBinaryChecksums)
if err != nil {
return err
}
if exeInfo.SearchAttributes == nil {
exeInfo.SearchAttributes = make(map[string][]byte)
}
exeInfo.SearchAttributes[definition.BinaryChecksums] = bytes
if e.shard.GetConfig().AdvancedVisibilityWritingMode() != common.AdvancedVisibilityWritingModeOff {
return e.taskGenerator.generateWorkflowSearchAttrTasks(e.unixNanoToTime(event.GetTimestamp()))
}
return nil
}

// TODO: we will release the restriction when reset API allow those pending
Expand Down
12 changes: 7 additions & 5 deletions service/history/mutableStateDecisionTaskManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskCompletedEven
event *workflow.HistoryEvent,
) error {
m.beforeAddDecisionTaskCompletedEvent()
m.afterAddDecisionTaskCompletedEvent(event, math.MaxInt32)
return nil
return m.afterAddDecisionTaskCompletedEvent(event, math.MaxInt32)
}

func (m *mutableStateDecisionTaskManagerImpl) ReplicateDecisionTaskFailedEvent() error {
Expand Down Expand Up @@ -477,7 +476,10 @@ func (m *mutableStateDecisionTaskManagerImpl) AddDecisionTaskCompletedEvent(
// Now write the completed event
event := m.msb.hBuilder.AddDecisionTaskCompletedEvent(scheduleEventID, startedEventID, request)

m.afterAddDecisionTaskCompletedEvent(event, maxResetPoints)
err := m.afterAddDecisionTaskCompletedEvent(event, maxResetPoints)
if err != nil {
return nil, err
}
return event, nil
}

Expand Down Expand Up @@ -721,7 +723,7 @@ func (m *mutableStateDecisionTaskManagerImpl) beforeAddDecisionTaskCompletedEven
func (m *mutableStateDecisionTaskManagerImpl) afterAddDecisionTaskCompletedEvent(
event *workflow.HistoryEvent,
maxResetPoints int,
) {
) error {
m.msb.executionInfo.LastProcessedEvent = event.GetDecisionTaskCompletedEventAttributes().GetStartedEventId()
m.msb.addBinaryCheckSumIfNotExists(event, maxResetPoints)
return m.msb.addBinaryCheckSumIfNotExists(event, maxResetPoints)
}

0 comments on commit 88c6d1a

Please sign in to comment.