Skip to content

Commit

Permalink
Cassandra implementation for cross cluster queue (cadence-workflow#4237)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jun 1, 2021
1 parent de3a6ac commit 6f77ae2
Show file tree
Hide file tree
Showing 12 changed files with 991 additions and 539 deletions.
106 changes: 97 additions & 9 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ p.ExecutionStore = (*cassandraPersistence)(nil)
// Where x is any hexadecimal value, E represents the entity type valid values are:
// E = {DomainID = 1, WorkflowID = 2, RunID = 3}
// R represents row type in executions table, valid values are:
// R = {Shard = 1, Execution = 2, Transfer = 3, Timer = 4, Replication = 5}
// R = {Shard = 1, Execution = 2, Transfer = 3, Timer = 4, Replication = 5, Replication_DLQ = 6, CrossCluster = 7}
const (
// Special Domains related constants
emptyDomainID = "10000000-0000-f000-f000-000000000000"
Expand All @@ -82,7 +82,9 @@ const (
// Row Constants for Replication Task DLQ Row. Source cluster name will be used as WorkflowID.
rowTypeDLQDomainID = "10000000-6000-f000-f000-000000000000"
rowTypeDLQRunID = "30000000-6000-f000-f000-000000000000"
// TODO: add rowType for cross-region tasks
// Row Constants for Cross Cluster Task Row
rowTypeCrossClusterDomainID = "10000000-7000-f000-f000-000000000000"
rowTypeCrossClusterRunID = "30000000-7000-f000-f000-000000000000"
// Special TaskId constants
rowTypeExecutionTaskID = int64(-10)
rowTypeShardTaskID = int64(-11)
Expand All @@ -99,7 +101,7 @@ const (
rowTypeTimerTask
rowTypeReplicationTask
rowTypeDLQ
// TODO: add row type
rowTypeCrossClusterTask
)

const (
Expand Down Expand Up @@ -181,6 +183,8 @@ const (
`version: ?` +
`}`

templateCrossClusterTaskType = templateTransferTaskType

templateReplicationTaskType = `{` +
`domain_id: ?, ` +
`workflow_id: ?, ` +
Expand Down Expand Up @@ -324,6 +328,10 @@ workflow_state = ? ` +
`shard_id, type, domain_id, workflow_id, run_id, transfer, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateTransferTaskType + `, ?, ?)`

templateCreateCrossClusterTaskQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, cross_cluster, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateCrossClusterTaskType + `, ?, ?)`

templateCreateReplicationTaskQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, replication, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateReplicationTaskType + `, ?, ?)`
Expand Down Expand Up @@ -636,6 +644,17 @@ workflow_state = ? ` +
`and task_id > ? ` +
`and task_id <= ?`

templateGetCrossClusterTasksQuery = `SELECT cross_cluster ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id > ? ` +
`and task_id <= ?`

templateGetReplicationTasksQuery = `SELECT replication ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
Expand Down Expand Up @@ -674,6 +693,10 @@ workflow_state = ? ` +
`and task_id > ? ` +
`and task_id <= ?`

templateCompleteCrossClusterTaskQuery = templateCompleteTransferTaskQuery

templateRangeCompleteCrossClusterTaskQuery = templateRangeCompleteTransferTaskQuery

templateCompleteReplicationTaskBeforeQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -1663,8 +1686,44 @@ func (d *cassandraPersistence) GetCrossClusterTasks(
ctx context.Context,
request *p.GetCrossClusterTasksRequest,
) (*p.GetCrossClusterTasksResponse, error) {
// TODO: Implement GetCrossClusterTasks
panic("not implemented")

// Reading cross-cluster tasks need to be quorum level consistent, otherwise we could loose task
query := d.session.Query(templateGetCrossClusterTasksQuery,
d.shardID,
rowTypeCrossClusterTask,
rowTypeCrossClusterDomainID,
request.TargetCluster, // workflowID field is used to store target cluster
rowTypeCrossClusterRunID,
defaultVisibilityTimestamp,
request.ReadLevel,
request.MaxReadLevel,
).PageSize(request.BatchSize).PageState(request.NextPageToken).WithContext(ctx)

iter := query.Iter()
if iter == nil {
return nil, &types.InternalServiceError{
Message: "GetCrossClusterTasks operation failed. Not able to create query iterator.",
}
}

response := &p.GetCrossClusterTasksResponse{}
task := make(map[string]interface{})
for iter.MapScan(task) {
t := createCrossClusterTaskInfo(task["cross_cluster"].(map[string]interface{}))
// Reset task map to get it ready for next scan
task = make(map[string]interface{})

response.Tasks = append(response.Tasks, t)
}
nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
return nil, convertCommonErrors(d.client, "GetCrossClusterTasks", err)
}

return response, nil
}

func (d *cassandraPersistence) GetReplicationTasks(
Expand Down Expand Up @@ -1766,16 +1825,45 @@ func (d *cassandraPersistence) CompleteCrossClusterTask(
ctx context.Context,
request *p.CompleteCrossClusterTaskRequest,
) error {
// TODO: Implement CompleteCrossClusterTask
panic("not implemented")
query := d.session.Query(templateCompleteCrossClusterTaskQuery,
d.shardID,
rowTypeCrossClusterTask,
rowTypeCrossClusterDomainID,
request.TargetCluster,
rowTypeCrossClusterRunID,
defaultVisibilityTimestamp,
request.TaskID,
).WithContext(ctx)

err := query.Exec()
if err != nil {
return convertCommonErrors(d.client, "CompleteCrossClusterTask", err)
}

return nil
}

func (d *cassandraPersistence) RangeCompleteCrossClusterTask(
ctx context.Context,
request *p.RangeCompleteCrossClusterTaskRequest,
) error {
// TODO: Implement RangeCompleteCrossClusterTask
panic("not implemented")
query := d.session.Query(templateRangeCompleteCrossClusterTaskQuery,
d.shardID,
rowTypeCrossClusterTask,
rowTypeCrossClusterDomainID,
request.TargetCluster,
rowTypeCrossClusterRunID,
defaultVisibilityTimestamp,
request.ExclusiveBeginTaskID,
request.InclusiveEndTaskID,
).WithContext(ctx)

err := query.Exec()
if err != nil {
return convertCommonErrors(d.client, "RangeCompleteCrossClusterTask", err)
}

return nil
}

func (d *cassandraPersistence) CompleteReplicationTask(
Expand Down
107 changes: 105 additions & 2 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,16 @@ func applyTasks(
return err
}

// TODO: create cross-cluster tasks
if err := createCrossClusterTasks(
batch,
crossClusterTasks,
shardID,
domainID,
workflowID,
runID,
); err != nil {
return err
}

if err := createReplicationTasks(
batch,
Expand Down Expand Up @@ -707,7 +716,7 @@ func createTransferTasks(

default:
return &types.InternalServiceError{
Message: fmt.Sprintf("Unknow transfer type: %v", task.GetType()),
Message: fmt.Sprintf("Unknown transfer type: %v", task.GetType()),
}
}

Expand Down Expand Up @@ -738,6 +747,82 @@ func createTransferTasks(
return nil
}

func createCrossClusterTasks(
batch gocql.Batch,
crossClusterTasks []p.Task,
shardID int,
domainID string,
workflowID string,
runID string,
) error {

for _, task := range crossClusterTasks {
var taskList string
var scheduleID int64
var targetCluster string
var targetDomainID string
var targetWorkflowID string
targetRunID := p.CrossClusterTaskDefaultTargetRunID
targetChildWorkflowOnly := false
recordVisibility := false

switch task.GetType() {
case p.CrossClusterTaskTypeStartChildExecution:
targetCluster = task.(*p.CrossClusterStartChildExecutionTask).TargetCluster
targetDomainID = task.(*p.CrossClusterStartChildExecutionTask).TargetDomainID
targetWorkflowID = task.(*p.CrossClusterStartChildExecutionTask).TargetWorkflowID
scheduleID = task.(*p.CrossClusterStartChildExecutionTask).InitiatedID

case p.CrossClusterTaskTypeCancelExecution:
targetCluster = task.(*p.CrossClusterCancelExecutionTask).TargetCluster
targetDomainID = task.(*p.CrossClusterCancelExecutionTask).TargetDomainID
targetWorkflowID = task.(*p.CrossClusterCancelExecutionTask).TargetWorkflowID
targetRunID = task.(*p.CrossClusterCancelExecutionTask).TargetRunID
targetChildWorkflowOnly = task.(*p.CrossClusterCancelExecutionTask).TargetChildWorkflowOnly
scheduleID = task.(*p.CrossClusterCancelExecutionTask).InitiatedID

case p.CrossClusterTaskTypeSignalExecution:
targetCluster = task.(*p.CrossClusterSignalExecutionTask).TargetCluster
targetDomainID = task.(*p.CrossClusterSignalExecutionTask).TargetDomainID
targetWorkflowID = task.(*p.CrossClusterSignalExecutionTask).TargetWorkflowID
targetRunID = task.(*p.CrossClusterSignalExecutionTask).TargetRunID
targetChildWorkflowOnly = task.(*p.CrossClusterSignalExecutionTask).TargetChildWorkflowOnly
scheduleID = task.(*p.CrossClusterSignalExecutionTask).InitiatedID

default:
return &types.InternalServiceError{
Message: fmt.Sprintf("Unknown cross-cluster task type: %v", task.GetType()),
}
}

batch.Query(templateCreateCrossClusterTaskQuery,
shardID,
rowTypeCrossClusterTask,
rowTypeCrossClusterDomainID,
targetCluster,
rowTypeCrossClusterRunID,
domainID,
workflowID,
runID,
task.GetVisibilityTimestamp(),
task.GetTaskID(),
targetDomainID,
targetWorkflowID,
targetRunID,
targetChildWorkflowOnly,
taskList,
task.GetType(),
scheduleID,
recordVisibility,
task.GetVersion(),
defaultVisibilityTimestamp,
task.GetTaskID(),
)
}

return nil
}

func createReplicationTasks(
batch gocql.Batch,
replicationTasks []p.Task,
Expand Down Expand Up @@ -1624,6 +1709,24 @@ func createTransferTaskInfo(
return info
}

func createCrossClusterTaskInfo(
result map[string]interface{},
) *p.CrossClusterTaskInfo {
info := (*p.CrossClusterTaskInfo)(createTransferTaskInfo(result))
if p.CrossClusterTaskDefaultTargetRunID == p.TransferTaskTransferTargetRunID {
return info
}

// incase CrossClusterTaskDefaultTargetRunID is updated and not equal to TransferTaskTransferTargetRunID
if v, ok := result["target_run_id"]; ok {
info.TargetRunID = v.(gocql.UUID).String()
if info.TargetRunID == p.CrossClusterTaskDefaultTargetRunID {
info.TargetRunID = ""
}
}
return info
}

func createReplicationTaskInfo(
result map[string]interface{},
) *p.InternalReplicationTaskInfo {
Expand Down
56 changes: 55 additions & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ const (
// TransferTaskTransferTargetRunID is the the dummy run ID for transfer tasks of types
// that do not have a target workflow
TransferTaskTransferTargetRunID = "30000000-0000-f000-f000-000000000002"
// CrossClusterTaskDefaultTargetRunID is the the dummy run ID for cross-cluster tasks of types
// that do not have a target workflow
CrossClusterTaskDefaultTargetRunID = TransferTaskTransferTargetRunID

// indicate invalid workflow state transition
invalidStateTransitionMsg = "unable to change workflow state from %v to %v, close status %v"
Expand Down Expand Up @@ -1057,11 +1060,13 @@ type (

// CompleteCrossClusterTaskRequest is used to complete a task in the cross-cluster task queue
CompleteCrossClusterTaskRequest struct {
TaskID int64
TargetCluster string
TaskID int64
}

// RangeCompleteCrossClusterTaskRequest is used to complete a range of tasks in the cross-cluster task queue
RangeCompleteCrossClusterTaskRequest struct {
TargetCluster string
ExclusiveBeginTaskID int64
InclusiveEndTaskID int64
}
Expand Down Expand Up @@ -2612,6 +2617,55 @@ func (t *TimerTaskInfo) String() string {
)
}

// Copy returns a copy of shardInfo
func (s *ShardInfo) Copy() *ShardInfo {
transferFailoverLevels := map[string]TransferFailoverLevel{}
for k, v := range s.TransferFailoverLevels {
transferFailoverLevels[k] = v
}
timerFailoverLevels := map[string]TimerFailoverLevel{}
for k, v := range s.TimerFailoverLevels {
timerFailoverLevels[k] = v
}
clusterTransferAckLevel := make(map[string]int64)
for k, v := range s.ClusterTransferAckLevel {
clusterTransferAckLevel[k] = v
}
clusterTimerAckLevel := make(map[string]time.Time)
for k, v := range s.ClusterTimerAckLevel {
clusterTimerAckLevel[k] = v
}
clusterReplicationLevel := make(map[string]int64)
for k, v := range s.ClusterReplicationLevel {
clusterReplicationLevel[k] = v
}
replicationDLQAckLevel := make(map[string]int64)
for k, v := range s.ReplicationDLQAckLevel {
replicationDLQAckLevel[k] = v
}
return &ShardInfo{
ShardID: s.ShardID,
Owner: s.Owner,
RangeID: s.RangeID,
StolenSinceRenew: s.StolenSinceRenew,
ReplicationAckLevel: s.ReplicationAckLevel,
TransferAckLevel: s.TransferAckLevel,
TimerAckLevel: s.TimerAckLevel,
TransferFailoverLevels: transferFailoverLevels,
TimerFailoverLevels: timerFailoverLevels,
ClusterTransferAckLevel: clusterTransferAckLevel,
ClusterTimerAckLevel: clusterTimerAckLevel,
TransferProcessingQueueStates: s.TransferProcessingQueueStates,
CrossClusterProcessQueueStates: s.CrossClusterProcessQueueStates,
TimerProcessingQueueStates: s.TimerProcessingQueueStates,
DomainNotificationVersion: s.DomainNotificationVersion,
ClusterReplicationLevel: clusterReplicationLevel,
ReplicationDLQAckLevel: replicationDLQAckLevel,
PendingFailoverMarkers: s.PendingFailoverMarkers,
UpdatedAt: s.UpdatedAt,
}
}

// SerializeClusterConfigs makes an array of *ClusterReplicationConfig serializable
// by flattening them into map[string]interface{}
func SerializeClusterConfigs(replicationConfigs []*ClusterReplicationConfig) []map[string]interface{} {
Expand Down
Loading

0 comments on commit 6f77ae2

Please sign in to comment.