Skip to content

Commit

Permalink
Refactor memo to use mutable state (cadence-workflow#2288)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Jul 26, 2019
1 parent 6404088 commit 7ea02e6
Show file tree
Hide file tree
Showing 19 changed files with 111 additions and 15 deletions.
50 changes: 46 additions & 4 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ const (
`branch_token: ?, ` +
`cron_schedule: ?, ` +
`expiration_seconds: ?, ` +
`search_attributes: ? ` +
`search_attributes: ?, ` +
`memo: ? ` +
`}`

templateReplicationStateType = `{` +
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ func createExecution(
executionInfo.CronSchedule,
executionInfo.ExpirationSeconds,
executionInfo.SearchAttributes,
executionInfo.Memo,
executionInfo.NextEventID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID)
Expand Down Expand Up @@ -519,6 +520,7 @@ func createExecution(
executionInfo.CronSchedule,
executionInfo.ExpirationSeconds,
executionInfo.SearchAttributes,
executionInfo.Memo,
replicationState.CurrentVersion,
replicationState.StartVersion,
replicationState.LastWriteVersion,
Expand Down Expand Up @@ -625,6 +627,7 @@ func updateExecution(
executionInfo.CronSchedule,
executionInfo.ExpirationSeconds,
executionInfo.SearchAttributes,
executionInfo.Memo,
executionInfo.NextEventID,
shardID,
rowTypeExecution,
Expand Down Expand Up @@ -697,6 +700,7 @@ func updateExecution(
executionInfo.CronSchedule,
executionInfo.ExpirationSeconds,
executionInfo.SearchAttributes,
executionInfo.Memo,
replicationState.CurrentVersion,
replicationState.StartVersion,
replicationState.LastWriteVersion,
Expand Down Expand Up @@ -1758,6 +1762,8 @@ func createWorkflowExecutionInfo(
info.ExpirationSeconds = int32(v.(int))
case "search_attributes":
info.SearchAttributes = v.(map[string][]byte)
case "memo":
info.Memo = v.(map[string][]byte)
}
}
info.CompletionEvent = p.NewDataBlob(completionEventData, completionEventEncoding)
Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ type (
ClientFeatureVersion string
ClientImpl string
AutoResetPoints *workflow.ResetPoints
Memo map[string][]byte
SearchAttributes map[string][]byte
// for retry
Attempt int32
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (m *executionManagerImpl) DeserializeExecutionInfo(
ExpirationSeconds: info.ExpirationSeconds,
AutoResetPoints: autoResetPoints,
SearchAttributes: info.SearchAttributes,
Memo: info.Memo,
}
newStats := &ExecutionStats{
HistorySize: info.HistorySize,
Expand Down Expand Up @@ -478,6 +479,7 @@ func (m *executionManagerImpl) SerializeExecutionInfo(
BranchToken: info.BranchToken,
CronSchedule: info.CronSchedule,
ExpirationSeconds: info.ExpirationSeconds,
Memo: info.Memo,
SearchAttributes: info.SearchAttributes,

// attributes which are not related to mutable state
Expand Down
27 changes: 26 additions & 1 deletion common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,12 @@ func (s *ExecutionManagerSuite) TestGetWorkflow() {
testSearchAttrKey: testSearchAttrVal,
}

testMemoKey := "memoKey"
testMemoVal, _ := json.Marshal("memoVal")
testMemo := map[string][]byte{
testMemoKey: testMemoVal,
}

createReq := &p.CreateWorkflowExecutionRequest{
NewWorkflowSnapshot: p.WorkflowSnapshot{
ExecutionInfo: &p.WorkflowExecutionInfo{
Expand Down Expand Up @@ -847,6 +853,7 @@ func (s *ExecutionManagerSuite) TestGetWorkflow() {
ExpirationSeconds: rand.Int31(),
AutoResetPoints: &testResetPoints,
SearchAttributes: testSearchAttr,
Memo: testMemo,
},
ExecutionStats: &p.ExecutionStats{
HistorySize: int64(rand.Int31()),
Expand Down Expand Up @@ -914,6 +921,9 @@ func (s *ExecutionManagerSuite) TestGetWorkflow() {
val, ok := info.SearchAttributes[testSearchAttrKey]
s.True(ok)
s.Equal(testSearchAttrVal, val)
val, ok = info.Memo[testMemoKey]
s.True(ok)
s.Equal(testMemoVal, val)

s.Equal(createReq.NewWorkflowSnapshot.ReplicationState.LastWriteEventID, state.ReplicationState.LastWriteEventID)
s.Equal(createReq.NewWorkflowSnapshot.ReplicationState.LastWriteVersion, state.ReplicationState.LastWriteVersion)
Expand Down Expand Up @@ -972,6 +982,7 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflow() {
s.Equal(int32(0), info0.SignalCount)
s.True(info0.AutoResetPoints.Equals(&gen.ResetPoints{}))
s.True(len(info0.SearchAttributes) == 0)
s.True(len(info0.Memo) == 0)

log.Infof("Workflow execution last updated: %v", info0.LastUpdatedTimestamp)

Expand Down Expand Up @@ -1000,6 +1011,9 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflow() {
searchAttrKey := "env"
searchAttrVal := []byte("test")
updatedInfo.SearchAttributes = map[string][]byte{searchAttrKey: searchAttrVal}
memoKey := "memoKey"
memoVal := []byte("memoVal")
updatedInfo.Memo = map[string][]byte{memoKey: memoVal}
updatedStats.HistorySize = math.MaxInt64

err2 := s.UpdateWorkflowExecution(updatedInfo, updatedStats, []int64{int64(4)}, nil, int64(3), nil, nil, nil, nil, nil)
Expand Down Expand Up @@ -1047,6 +1061,9 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflow() {
searchAttrVal1, ok := info1.SearchAttributes[searchAttrKey]
s.True(ok)
s.Equal(searchAttrVal, searchAttrVal1)
memoVal1, ok := info1.Memo[memoKey]
s.True(ok)
s.Equal(memoVal, memoVal1)

log.Infof("Workflow execution last updated: %v", info1.LastUpdatedTimestamp)

Expand Down Expand Up @@ -1093,7 +1110,9 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflow() {
searchAttrVal2, ok := info2.SearchAttributes[searchAttrKey]
s.True(ok)
s.Equal(searchAttrVal, searchAttrVal2)

memoVal2, ok := info1.Memo[memoKey]
s.True(ok)
s.Equal(memoVal, memoVal2)
log.Infof("Workflow execution last updated: %v", info2.LastUpdatedTimestamp)

err5 := s.UpdateWorkflowExecutionWithRangeID(failedUpdateInfo, failedUpdateStats, []int64{int64(5)}, nil, int64(12345), int64(5), nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "")
Expand Down Expand Up @@ -1137,6 +1156,9 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflow() {
searchAttrVal3, ok := info3.SearchAttributes[searchAttrKey]
s.True(ok)
s.Equal(searchAttrVal, searchAttrVal3)
memoVal3, ok := info1.Memo[memoKey]
s.True(ok)
s.Equal(memoVal, memoVal3)

log.Infof("Workflow execution last updated: %v", info3.LastUpdatedTimestamp)

Expand Down Expand Up @@ -1181,6 +1203,9 @@ func (s *ExecutionManagerSuite) TestUpdateWorkflow() {
searchAttrVal4, ok := info4.SearchAttributes[searchAttrKey]
s.True(ok)
s.Equal(searchAttrVal, searchAttrVal4)
memoVal4, ok := info1.Memo[memoKey]
s.True(ok)
s.Equal(memoVal, memoVal4)

log.Infof("Workflow execution last updated: %v", info4.LastUpdatedTimestamp)
}
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ type (
BranchToken []byte
CronSchedule string
ExpirationSeconds int32
Memo map[string][]byte
SearchAttributes map[string][]byte

// attributes which are not related to mutable state at all
Expand Down
1 change: 1 addition & 0 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution(
ExecutionContext: info.GetExecutionContext(),
NonRetriableErrors: info.GetRetryNonRetryableErrors(),
SearchAttributes: info.GetSearchAttributes(),
Memo: info.GetMemo(),
}

if info.LastWriteEventID != nil {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/sql/sqlExecutionManagerUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,7 @@ func buildExecutionRow(
AutoResetPoints: executionInfo.AutoResetPoints.Data,
AutoResetPointsEncoding: common.StringPtr(string(executionInfo.AutoResetPoints.GetEncoding())),
SearchAttributes: executionInfo.SearchAttributes,
Memo: executionInfo.Memo,
}

completionEvent := executionInfo.CompletionEvent
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/sqlblobs.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ struct WorkflowExecutionInfo {
115: optional binary autoResetPoints
116: optional string autoResetPointsEncoding
118: optional map<string, binary> searchAttributes
120: optional map<string, binary> memo
}

struct ActivityInfo {
Expand Down
3 changes: 2 additions & 1 deletion schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ CREATE TYPE workflow_execution (
last_event_task_id bigint,
auto_reset_points blob, -- the resetting points for auto-reset feature
auto_reset_points_encoding text, -- encoding for auto_reset_points_data
search_attributes map<text, blob>
search_attributes map<text, blob>,
memo map<text, blob>
);

-- Replication information for each cluster
Expand Down
8 changes: 8 additions & 0 deletions schema/cassandra/cadence/versioned/v0.20/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": "0.20",
"MinCompatibleVersion": "0.20",
"Description": "Add memo to mutable state",
"SchemaUpdateCqlFiles": [
"memo.cql"
]
}
1 change: 1 addition & 0 deletions schema/cassandra/cadence/versioned/v0.20/memo.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE workflow_execution ADD memo map<text, blob>;
2 changes: 2 additions & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5235,6 +5235,8 @@ func copyWorkflowExecutionInfo(sourceInfo *persistence.WorkflowExecutionInfo) *p
ClientFeatureVersion: sourceInfo.ClientFeatureVersion,
ClientImpl: sourceInfo.ClientImpl,
AutoResetPoints: sourceInfo.AutoResetPoints,
Memo: sourceInfo.Memo,
SearchAttributes: sourceInfo.SearchAttributes,
Attempt: sourceInfo.Attempt,
HasRetryPolicy: sourceInfo.HasRetryPolicy,
InitialInterval: sourceInfo.InitialInterval,
Expand Down
3 changes: 3 additions & 0 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,9 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionStartedEvent(
domainEntry.GetRetentionDays(e.executionInfo.WorkflowID),
)

if event.Memo != nil {
e.executionInfo.Memo = event.Memo.GetFields()
}
if event.SearchAttributes != nil {
e.executionInfo.SearchAttributes = event.SearchAttributes.GetIndexedFields()
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (t *transferQueueActiveProcessorImpl) processCloseExecution(
return &workflow.InternalServiceError{Message: "Unable to get workflow start event."}
}
workflowExecutionTimestamp := getWorkflowExecutionTimestamp(msBuilder, startEvent)
visibilityMemo := getVisibilityMemo(startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := executionInfo.SearchAttributes

// release the context lock since we no longer need mutable state builder and
Expand Down Expand Up @@ -943,7 +943,7 @@ func (t *transferQueueActiveProcessorImpl) processRecordWorkflowStartedOrUpsertH
return &workflow.InternalServiceError{Message: "Failed to load start event."}
}
executionTimestamp := getWorkflowExecutionTimestamp(msBuilder, startEvent)
visibilityMemo := getVisibilityMemo(startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)

// release the context lock since we no longer need mutable state builder and
Expand Down
6 changes: 3 additions & 3 deletions service/history/transferQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ func getWorkflowExecutionTimestamp(msBuilder mutableState, startEvent *workflow.
return executionTimestamp
}

func getVisibilityMemo(startEvent *workflow.HistoryEvent) *workflow.Memo {
if startEvent == nil {
func getWorkflowMemo(memo map[string][]byte) *workflow.Memo {
if memo == nil {
return nil
}
return startEvent.WorkflowExecutionStartedEventAttributes.Memo
return &workflow.Memo{Fields: memo}
}
4 changes: 2 additions & 2 deletions service/history/transferQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (t *transferQueueStandbyProcessorImpl) processCloseExecution(
return &workflow.InternalServiceError{Message: "Failed to load start event."}
}
workflowExecutionTimestamp := getWorkflowExecutionTimestamp(msBuilder, startEvent)
visibilityMemo := getVisibilityMemo(startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := executionInfo.SearchAttributes

ok, err := verifyTaskVersion(t.shard, t.logger, transferTask.DomainID, msBuilder.GetLastWriteVersion(), transferTask.Version, transferTask)
Expand Down Expand Up @@ -512,7 +512,7 @@ func (t *transferQueueStandbyProcessorImpl) processRecordWorkflowStartedOrUpsert
return &workflow.InternalServiceError{Message: "Failed to load start event."}
}
executionTimestamp := getWorkflowExecutionTimestamp(msBuilder, startEvent)
visibilityMemo := getVisibilityMemo(startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)

if isRecordStart {
Expand Down
2 changes: 1 addition & 1 deletion tools/cassandra/updateTask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ func (s *UpdateSchemaTestSuite) TestDryrun() {
s.Nil(err)
defer client.Close()
dir := "../../schema/cassandra/cadence/versioned"
s.RunDryrunTest(buildCLIOptions(), client, "-k", dir, "0.19")
s.RunDryrunTest(buildCLIOptions(), client, "-k", dir, "0.20")
}

0 comments on commit 7ea02e6

Please sign in to comment.