Skip to content

Commit

Permalink
mutable state checksums part#2: persistence changes (cadence-workflow…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Jan 13, 2020
1 parent 95786aa commit 44cdb75
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 11 deletions.
28 changes: 21 additions & 7 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ const (
`created_time: ? ` +
`}`

templateChecksumType = `{` +
`version: ?, ` +
`flavor: ?, ` +
`value: ? ` +
`}`

templateCreateShardQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, shard, range_id)` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ` + templateShardType + `, ?) IF NOT EXISTS`
Expand Down Expand Up @@ -374,16 +380,17 @@ workflow_state = ? ` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}, {start_version: ?, last_write_version: ?}, ?, ?) IF NOT EXISTS USING TTL 0 `

templateCreateWorkflowExecutionQuery = `INSERT INTO executions (` +
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?) IF NOT EXISTS `
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id, checksum) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?, ` + templateChecksumType + `) IF NOT EXISTS `

templateCreateWorkflowExecutionWithReplicationQuery = `INSERT INTO executions (` +
`shard_id, domain_id, workflow_id, run_id, type, execution, replication_state, next_event_id, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ` + templateReplicationStateType + `, ?, ?, ?) IF NOT EXISTS `
`shard_id, domain_id, workflow_id, run_id, type, execution, replication_state, next_event_id, visibility_ts, task_id, checksum) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ` + templateReplicationStateType +
`, ?, ?, ?, ` + templateChecksumType + `) IF NOT EXISTS `

templateCreateWorkflowExecutionWithVersionHistoriesQuery = `INSERT INTO executions (` +
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id, version_histories, version_histories_encoding) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?, ?, ?) IF NOT EXISTS `
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id, version_histories, version_histories_encoding, checksum) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?, ?, ?, ` + templateChecksumType + `) IF NOT EXISTS `

templateCreateTransferTaskQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, transfer, visibility_ts, task_id) ` +
Expand All @@ -408,7 +415,9 @@ workflow_state = ? ` +
`and task_id = ? ` +
`IF range_id = ?`

templateGetWorkflowExecutionQuery = `SELECT execution, replication_state, activity_map, timer_map, child_executions_map, request_cancel_map, signal_map, signal_requested, buffered_events_list, buffered_replication_tasks_map, version_histories, version_histories_encoding ` +
templateGetWorkflowExecutionQuery = `SELECT execution, replication_state, activity_map, timer_map, ` +
`child_executions_map, request_cancel_map, signal_map, signal_requested, buffered_events_list, ` +
`buffered_replication_tasks_map, version_histories, version_histories_encoding, checksum ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -442,6 +451,7 @@ workflow_state = ? ` +
templateUpdateWorkflowExecutionQuery = `UPDATE executions ` +
`SET execution = ` + templateWorkflowExecutionType +
`, next_event_id = ? ` +
`, checksum = ` + templateChecksumType +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -455,6 +465,7 @@ workflow_state = ? ` +
`SET execution = ` + templateWorkflowExecutionType +
`, replication_state = ` + templateReplicationStateType +
`, next_event_id = ? ` +
`, checksum = ` + templateChecksumType +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -469,6 +480,7 @@ workflow_state = ? ` +
`, next_event_id = ? ` +
`, version_histories = ? ` +
`, version_histories_encoding = ? ` +
`, checksum = ` + templateChecksumType +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand Down Expand Up @@ -1357,6 +1369,8 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *p.GetWorkflowExecut
}
state.BufferedEvents = bufferedEventsBlobs

state.Checksum = createChecksum(result["checksum"].(map[string]interface{}))

return &p.InternalGetWorkflowExecutionResponse{State: state}, nil
}

Expand Down
48 changes: 45 additions & 3 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
p "github.com/uber/cadence/common/persistence"
)

Expand Down Expand Up @@ -55,6 +56,7 @@ func applyWorkflowMutationBatch(
versionHistories,
cqlNowTimestampMillis,
condition,
workflowMutation.Checksum,
); err != nil {
return err
}
Expand Down Expand Up @@ -170,6 +172,7 @@ func applyWorkflowSnapshotBatchAsReset(
versionHistories,
cqlNowTimestampMillis,
condition,
workflowSnapshot.Checksum,
); err != nil {
return err
}
Expand Down Expand Up @@ -274,6 +277,7 @@ func applyWorkflowSnapshotBatchAsNew(
executionInfo,
replicationState,
versionHistories,
workflowSnapshot.Checksum,
cqlNowTimestampMillis,
); err != nil {
return err
Expand Down Expand Up @@ -362,6 +366,7 @@ func createExecution(
executionInfo *p.InternalWorkflowExecutionInfo,
replicationState *p.ReplicationState,
versionHistories *p.DataBlob,
checksum checksum.Checksum,
cqlNowTimestampMillis int64,
) error {

Expand Down Expand Up @@ -460,7 +465,10 @@ func createExecution(
executionInfo.Memo,
executionInfo.NextEventID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID)
rowTypeExecutionTaskID,
checksum.Version,
checksum.Flavor,
checksum.Value)
} else if versionHistories != nil {
// TODO also need to set the start / current / last write version
versionHistoriesData, versionHistoriesEncoding := p.FromDataBlob(versionHistories)
Expand Down Expand Up @@ -532,7 +540,10 @@ func createExecution(
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
versionHistoriesData,
versionHistoriesEncoding)
versionHistoriesEncoding,
checksum.Version,
checksum.Flavor,
checksum.Value)
} else if replicationState != nil {
lastReplicationInfo := make(map[string]map[string]interface{})
for k, v := range replicationState.LastReplicationInfo {
Expand Down Expand Up @@ -610,7 +621,10 @@ func createExecution(
lastReplicationInfo,
executionInfo.NextEventID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID)
rowTypeExecutionTaskID,
checksum.Version,
checksum.Flavor,
checksum.Value)
} else {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("Create workflow execution with both version histories and replication state."),
Expand All @@ -627,6 +641,7 @@ func updateExecution(
versionHistories *p.DataBlob,
cqlNowTimestampMillis int64,
condition int64,
checksum checksum.Checksum,
) error {

// validate workflow state & close status
Expand Down Expand Up @@ -717,6 +732,9 @@ func updateExecution(
executionInfo.SearchAttributes,
executionInfo.Memo,
executionInfo.NextEventID,
checksum.Version,
checksum.Flavor,
checksum.Value,
shardID,
rowTypeExecution,
domainID,
Expand Down Expand Up @@ -790,6 +808,9 @@ func updateExecution(
executionInfo.NextEventID,
versionHistoriesData,
versionHistoriesEncoding,
checksum.Version,
checksum.Flavor,
checksum.Value,
shardID,
rowTypeExecution,
domainID,
Expand Down Expand Up @@ -869,6 +890,9 @@ func updateExecution(
replicationState.LastWriteEventID,
lastReplicationInfo,
executionInfo.NextEventID,
checksum.Version,
checksum.Flavor,
checksum.Value,
shardID,
rowTypeExecution,
domainID,
Expand Down Expand Up @@ -2500,6 +2524,24 @@ func createReplicationInfoMap(
return rInfoMap
}

func createChecksum(result map[string]interface{}) checksum.Checksum {
csum := checksum.Checksum{}
if len(result) == 0 {
return csum
}
for k, v := range result {
switch k {
case "flavor":
csum.Flavor = checksum.Flavor(v.(int))
case "version":
csum.Version = v.(int)
case "value":
csum.Value = v.([]byte)
}
}
return csum
}

func isTimeoutError(err error) bool {
if err == gocql.ErrTimeoutNoResponse {
return true
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (m *executionManagerImpl) GetWorkflowExecution(
SignalInfos: response.State.SignalInfos,
SignalRequestedIDs: response.State.SignalRequestedIDs,
ReplicationState: response.State.ReplicationState,
Checksum: response.State.Checksum,
},
}

Expand Down Expand Up @@ -672,6 +673,7 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
TimerTasks: input.TimerTasks,

Condition: input.Condition,
Checksum: input.Checksum,
}, nil
}

Expand Down Expand Up @@ -729,6 +731,7 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
TimerTasks: input.TimerTasks,

Condition: input.Condition,
Checksum: input.Checksum,
}, nil
}

Expand Down
Loading

0 comments on commit 44cdb75

Please sign in to comment.