Skip to content

Commit

Permalink
Support for out of order processing of replication tasks (cadence-wo…
Browse files Browse the repository at this point in the history
…rkflow#693)

* persistence layer support for buffered replication tasks

* reorder working

* unit test of persistence support for buffered replication tasks

* Support for out of order processing of replication tasks

Schema changes to buffer replication tasks as part of mutable state.
Persistence layer changes to provide support for adding new replication
tasks, and deleting replication tasks after they are processed and
applied to workflow execution.

MutableState change to support CRUD of buffered replication tasks.  And
HistoryReplicator changes to detect out-of-order events and instead add
replication task to the buffer.  Also loop through all replication task
and flush any buffered tasks after processing of an update.
  • Loading branch information
samarabbas authored and wxing1292 committed Apr 25, 2018
1 parent 021330b commit 99f36c9
Show file tree
Hide file tree
Showing 13 changed files with 727 additions and 144 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ before_install:

install:
- go get -u github.com/Masterminds/glide
# remove the mkdir and git checkout part when https://github.com/golang/lint/issues/397 is fixed
- mkdir -p $GOPATH/src/golang.org/x && git clone https://github.com/golang/lint.git $GOPATH/src/golang.org/x/lint && go get -u golang.org/x/lint/golint
- go get -u golang.org/x/lint/golint
- go get github.com/axw/gocov/gocov
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
Expand Down
148 changes: 147 additions & 1 deletion common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,32 @@ const (
`control: ?` +
`}`

templateBufferedReplicationTaskInfoType = `{` +
`first_event_id: ?, ` +
`next_event_id: ?, ` +
`version: ?, ` +
`history: ` + templateSerializedEventBatch + `, ` +
`new_run_history: ` + templateSerializedEventBatch + ` ` +
`}`

templateBufferedReplicationTaskInfoNoNewRunHistoryType = `{` +
`first_event_id: ?, ` +
`next_event_id: ?, ` +
`version: ?, ` +
`history: ` + templateSerializedEventBatch + ` ` +
`}`

templateReplicationInfoType = `{` +
`version: ?, ` +
`last_event_id: ?` +
`}`

templateSerializedEventBatch = `{` +
`encoding_type: ?, ` +
`version: ?, ` +
`data: ?` +
`}`

templateTaskListType = `{` +
`domain_id: ?, ` +
`name: ?, ` +
Expand Down Expand Up @@ -334,7 +355,7 @@ const (
`and task_id = ? ` +
`IF range_id = ?`

templateGetWorkflowExecutionQuery = `SELECT execution, replication_state, activity_map, timer_map, child_executions_map, request_cancel_map, signal_map, signal_requested, buffered_events_list ` +
templateGetWorkflowExecutionQuery = `SELECT execution, replication_state, activity_map, timer_map, child_executions_map, request_cancel_map, signal_map, signal_requested, buffered_events_list, buffered_replication_tasks_map ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -523,6 +544,39 @@ const (
`and task_id = ? ` +
`IF next_event_id = ?`

templateUpdateBufferedReplicationTasksQuery = `UPDATE executions ` +
`SET buffered_replication_tasks_map[ ? ] =` + templateBufferedReplicationTaskInfoType + ` ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateUpdateBufferedReplicationTasksNoNewRunHistoryQuery = `UPDATE executions ` +
`SET buffered_replication_tasks_map[ ? ] =` + templateBufferedReplicationTaskInfoNoNewRunHistoryType + ` ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateDeleteBufferedReplicationTaskQuery = `DELETE buffered_replication_tasks_map[ ? ] ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateDeleteWorkflowExecutionMutableStateQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -1214,6 +1268,14 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio
}
state.BufferedEvents = bufferedEvents

bufferedReplicationTasks := make(map[int64]*BufferedReplicationTask)
bufferedRTMap := result["buffered_replication_tasks_map"].(map[int64]map[string]interface{})
for k, v := range bufferedRTMap {
info := createBufferedReplicationTaskInfo(v)
bufferedReplicationTasks[k] = info
}
state.BufferedReplicationTasks = bufferedReplicationTasks

return &GetWorkflowExecutionResponse{State: state}, nil
}

Expand Down Expand Up @@ -1356,6 +1418,9 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
d.updateBufferedEvents(batch, request.NewBufferedEvents, request.ClearBufferedEvents,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID)

d.updateBufferedReplicationTasks(batch, request.NewBufferedReplicationTask, request.DeleteBufferedReplicationTask,
executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition, request.RangeID)

if request.ContinueAsNew != nil {
startReq := request.ContinueAsNew
d.CreateWorkflowExecutionWithinBatch(startReq, batch, cqlNowTimestamp)
Expand Down Expand Up @@ -2475,6 +2540,65 @@ func (d *cassandraPersistence) updateBufferedEvents(batch *gocql.Batch, newBuffe
}
}

func (d *cassandraPersistence) updateBufferedReplicationTasks(batch *gocql.Batch, newBufferedReplicationTask *BufferedReplicationTask,
deleteInfo *int64, domainID, workflowID, runID string, condition int64, rangeID int64) {

if newBufferedReplicationTask != nil {
if newBufferedReplicationTask.NewRunHistory != nil {
batch.Query(templateUpdateBufferedReplicationTasksQuery,
newBufferedReplicationTask.FirstEventID,
newBufferedReplicationTask.FirstEventID,
newBufferedReplicationTask.NextEventID,
newBufferedReplicationTask.Version,
newBufferedReplicationTask.History.EncodingType,
newBufferedReplicationTask.History.Version,
newBufferedReplicationTask.History.Data,
newBufferedReplicationTask.NewRunHistory.EncodingType,
newBufferedReplicationTask.NewRunHistory.Version,
newBufferedReplicationTask.NewRunHistory.Data,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
} else {
batch.Query(templateUpdateBufferedReplicationTasksNoNewRunHistoryQuery,
newBufferedReplicationTask.FirstEventID,
newBufferedReplicationTask.FirstEventID,
newBufferedReplicationTask.NextEventID,
newBufferedReplicationTask.Version,
newBufferedReplicationTask.History.EncodingType,
newBufferedReplicationTask.History.Version,
newBufferedReplicationTask.History.Data,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
}
}

// deleteInfo is the FirstEventID for the history batch being deleted
if deleteInfo != nil {
batch.Query(templateDeleteBufferedReplicationTaskQuery,
*deleteInfo,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
}
}

func createShardInfo(currentCluster string, result map[string]interface{}) *ShardInfo {
info := &ShardInfo{}
for k, v := range result {
Expand Down Expand Up @@ -2804,6 +2928,28 @@ func createSignalInfo(result map[string]interface{}) *SignalInfo {
return info
}

func createBufferedReplicationTaskInfo(result map[string]interface{}) *BufferedReplicationTask {
info := &BufferedReplicationTask{}
for k, v := range result {
switch k {
case "first_event_id":
info.FirstEventID = v.(int64)
case "next_event_id":
info.NextEventID = v.(int64)
case "version":
info.Version = v.(int64)
case "history":
h := v.(map[string]interface{})
info.History = createSerializedHistoryEventBatch(h)
case "new_run_history":
h := v.(map[string]interface{})
info.NewRunHistory = createSerializedHistoryEventBatch(h)
}
}

return info
}

func createSerializedHistoryEventBatch(result map[string]interface{}) *SerializedHistoryEventBatch {
// TODO: default to JSON, update this when we support different encoding types.
eventBatch := &SerializedHistoryEventBatch{EncodingType: common.EncodingTypeJSON}
Expand Down
Loading

0 comments on commit 99f36c9

Please sign in to comment.