Skip to content

Commit

Permalink
Auto reset open workflow by binaryChecksum : persistence for execution (
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Apr 29, 2019
1 parent 41e7412 commit d5045b5
Show file tree
Hide file tree
Showing 20 changed files with 1,814 additions and 50 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

1,187 changes: 1,181 additions & 6 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions .gen/go/sqlblobs/idl.go

Large diffs are not rendered by default.

180 changes: 176 additions & 4 deletions .gen/go/sqlblobs/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 20 additions & 2 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ const (
`client_library_version: ?, ` +
`client_feature_version: ?, ` +
`client_impl: ?, ` +
`auto_reset_points: ?, ` +
`auto_reset_points_encoding: ?, ` +
`attempt: ?, ` +
`has_retry_policy: ?, ` +
`init_interval: ?, ` +
Expand Down Expand Up @@ -1105,7 +1107,7 @@ func (d *cassandraPersistence) UpdateShard(request *p.UpdateShardRequest) error
return nil
}

func (d *cassandraPersistence) CreateWorkflowExecution(request *p.CreateWorkflowExecutionRequest) (
func (d *cassandraPersistence) CreateWorkflowExecution(request *p.InternalCreateWorkflowExecutionRequest) (
*p.CreateWorkflowExecutionResponse, error) {
cqlNowTimestamp := p.UnixNanoToDBTimestamp(time.Now().UnixNano())
batch := d.session.NewBatch(gocql.LoggedBatch)
Expand Down Expand Up @@ -1242,7 +1244,7 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *p.CreateWorkflow
return &p.CreateWorkflowExecutionResponse{}, nil
}

func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *p.CreateWorkflowExecutionRequest,
func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *p.InternalCreateWorkflowExecutionRequest,
batch *gocql.Batch, cqlNowTimestamp int64) {

parentDomainID := emptyDomainID
Expand Down Expand Up @@ -1388,6 +1390,8 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *p.Cre
"", // client_library_version
"", // client_feature_version
"", // client_impl
request.PreviousAutoResetPoints.Data,
request.PreviousAutoResetPoints.GetEncoding(),
request.Attempt,
request.HasRetryPolicy,
request.InitialInterval,
Expand Down Expand Up @@ -1455,6 +1459,8 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *p.Cre
"", // client_library_version
"", // client_feature_version
"", // client_impl
request.PreviousAutoResetPoints.Data,
request.PreviousAutoResetPoints.GetEncoding(),
request.Attempt,
request.HasRetryPolicy,
request.InitialInterval,
Expand Down Expand Up @@ -1643,6 +1649,8 @@ func (d *cassandraPersistence) updateMutableState(batch *gocql.Batch, executionI
executionInfo.ClientLibraryVersion,
executionInfo.ClientFeatureVersion,
executionInfo.ClientImpl,
executionInfo.AutoResetPoints.Data,
executionInfo.AutoResetPoints.GetEncoding(),
executionInfo.Attempt,
executionInfo.HasRetryPolicy,
executionInfo.InitialInterval,
Expand Down Expand Up @@ -1710,6 +1718,8 @@ func (d *cassandraPersistence) updateMutableState(batch *gocql.Batch, executionI
executionInfo.ClientLibraryVersion,
executionInfo.ClientFeatureVersion,
executionInfo.ClientImpl,
executionInfo.AutoResetPoints.Data,
executionInfo.AutoResetPoints.GetEncoding(),
executionInfo.Attempt,
executionInfo.HasRetryPolicy,
executionInfo.InitialInterval,
Expand Down Expand Up @@ -3732,6 +3742,9 @@ func createWorkflowExecutionInfo(result map[string]interface{}) *p.InternalWorkf
info := &p.InternalWorkflowExecutionInfo{}
var completionEventData []byte
var completionEventEncoding common.EncodingType
var autoResetPoints []byte
var autoResetPointsEncoding common.EncodingType

for k, v := range result {
switch k {
case "domain_id":
Expand All @@ -3754,6 +3767,10 @@ func createWorkflowExecutionInfo(result map[string]interface{}) *p.InternalWorkf
completionEventData = v.([]byte)
case "completion_event_data_encoding":
completionEventEncoding = common.EncodingType(v.(string))
case "auto_reset_points":
autoResetPoints = v.([]byte)
case "auto_reset_points_encoding":
autoResetPointsEncoding = common.EncodingType(v.(string))
case "task_list":
info.TaskList = v.(string)
case "workflow_type_name":
Expand Down Expand Up @@ -3841,6 +3858,7 @@ func createWorkflowExecutionInfo(result map[string]interface{}) *p.InternalWorkf
}
}
info.CompletionEvent = p.NewDataBlob(completionEventData, completionEventEncoding)
info.AutoResetPoints = p.NewDataBlob(autoResetPoints, autoResetPointsEncoding)
return info
}

Expand Down
Loading

0 comments on commit d5045b5

Please sign in to comment.