Skip to content

Commit

Permalink
Add cross-cluster task related types and methods to data/persistence …
Browse files Browse the repository at this point in the history
…interface (cadence-workflow#4225)
  • Loading branch information
yycptt authored May 26, 2021
1 parent e3e0c26 commit bda4c5c
Show file tree
Hide file tree
Showing 16 changed files with 483 additions and 32 deletions.
3 changes: 3 additions & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,12 @@ var (
StoreOperationIsWorkflowExecutionExists = storeOperation("is-wf-execution-exists")
StoreOperationListConcreteExecution = storeOperation("list-concrete-execution")
StoreOperationGetTransferTasks = storeOperation("get-transfer-tasks")
StoreOperationGetCrossClusterTasks = storeOperation("get-cross-cluster-tasks")
StoreOperationGetReplicationTasks = storeOperation("get-replication-tasks")
StoreOperationCompleteTransferTask = storeOperation("complete-transfer-task")
StoreOperationRangeCompleteTransferTask = storeOperation("range-complete-transfer-task")
StoreOperationCompleteCrossClusterTask = storeOperation("complete-cross-cluster-task")
StoreOperationRangeCompleteCrossClusterTask = storeOperation("range-complete-cross-cluster-task")
StoreOperationCompleteReplicationTask = storeOperation("complete-replication-task")
StoreOperationRangeCompleteReplicationTask = storeOperation("range-complete-replication-task")
StoreOperationPutReplicationTaskToDLQ = storeOperation("put-replication-task-to-dlq")
Expand Down
9 changes: 9 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ const (
PersistenceCompleteTransferTaskScope
// PersistenceRangeCompleteTransferTaskScope tracks CompleteTransferTasks calls made by service to persistence layer
PersistenceRangeCompleteTransferTaskScope
// PersistenceGetCrossClusterTasksScope tracks GetCrossClusterTasks calls made by service to persistence layer
PersistenceGetCrossClusterTasksScope
// PersistenceCompleteCrossClusterTaskScope tracks CompleteCrossClusterTasks calls made by service to persistence layer
PersistenceCompleteCrossClusterTaskScope
// PersistenceRangeCompleteCrossClusterTaskScope tracks CompleteCrossClusterTasks calls made by service to persistence layer
PersistenceRangeCompleteCrossClusterTaskScope
// PersistenceGetReplicationTasksScope tracks GetReplicationTasks calls made by service to persistence layer
PersistenceGetReplicationTasksScope
// PersistenceCompleteReplicationTaskScope tracks CompleteReplicationTasks calls made by service to persistence layer
Expand Down Expand Up @@ -1109,6 +1115,9 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"},
PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"},
PersistenceRangeCompleteTransferTaskScope: {operation: "RangeCompleteTransferTask"},
PersistenceGetCrossClusterTasksScope: {operation: "GetCrossClusterTasks"},
PersistenceCompleteCrossClusterTaskScope: {operation: "GetCrossClusterTasks"},
PersistenceRangeCompleteCrossClusterTaskScope: {operation: "GetCrossClusterTasks"},
PersistenceGetReplicationTasksScope: {operation: "GetReplicationTasks"},
PersistenceCompleteReplicationTaskScope: {operation: "CompleteReplicationTask"},
PersistenceRangeCompleteReplicationTaskScope: {operation: "RangeCompleteReplicationTask"},
Expand Down
59 changes: 57 additions & 2 deletions common/mocks/ExecutionManager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const (
// Row Constants for Replication Task DLQ Row. Source cluster name will be used as WorkflowID.
rowTypeDLQDomainID = "10000000-6000-f000-f000-000000000000"
rowTypeDLQRunID = "30000000-6000-f000-f000-000000000000"
// TODO: add rowType for cross-region tasks
// Special TaskId constants
rowTypeExecutionTaskID = int64(-10)
rowTypeShardTaskID = int64(-11)
Expand All @@ -98,6 +99,7 @@ const (
rowTypeTimerTask
rowTypeReplicationTask
rowTypeDLQ
// TODO: add row type
)

const (
Expand Down Expand Up @@ -1771,6 +1773,14 @@ func (d *cassandraPersistence) GetTransferTasks(
return response, nil
}

func (d *cassandraPersistence) GetCrossClusterTasks(
ctx context.Context,
request *p.GetCrossClusterTasksRequest,
) (*p.GetCrossClusterTasksResponse, error) {
// TODO: Implement GetCrossClusterTasks
panic("not implemented")
}

func (d *cassandraPersistence) GetReplicationTasks(
ctx context.Context,
request *p.GetReplicationTasksRequest,
Expand Down Expand Up @@ -1866,6 +1876,22 @@ func (d *cassandraPersistence) RangeCompleteTransferTask(
return nil
}

func (d *cassandraPersistence) CompleteCrossClusterTask(
ctx context.Context,
request *p.CompleteCrossClusterTaskRequest,
) error {
// TODO: Implement CompleteCrossClusterTask
panic("not implemented")
}

func (d *cassandraPersistence) RangeCompleteCrossClusterTask(
ctx context.Context,
request *p.RangeCompleteCrossClusterTaskRequest,
) error {
// TODO: Implement RangeCompleteCrossClusterTask
panic("not implemented")
}

func (d *cassandraPersistence) CompleteReplicationTask(
ctx context.Context,
request *p.CompleteReplicationTaskRequest,
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func applyWorkflowMutationBatch(
workflowID,
runID,
workflowMutation.TransferTasks,
workflowMutation.CrossClusterTasks,
workflowMutation.ReplicationTasks,
workflowMutation.TimerTasks,
)
Expand Down Expand Up @@ -250,6 +251,7 @@ func applyWorkflowSnapshotBatchAsReset(
workflowID,
runID,
workflowSnapshot.TransferTasks,
workflowSnapshot.CrossClusterTasks,
workflowSnapshot.ReplicationTasks,
workflowSnapshot.TimerTasks,
)
Expand Down Expand Up @@ -353,6 +355,7 @@ func applyWorkflowSnapshotBatchAsNew(
workflowID,
runID,
workflowSnapshot.TransferTasks,
workflowSnapshot.CrossClusterTasks,
workflowSnapshot.ReplicationTasks,
workflowSnapshot.TimerTasks,
)
Expand Down Expand Up @@ -602,6 +605,7 @@ func applyTasks(
workflowID string,
runID string,
transferTasks []p.Task,
crossClusterTasks []p.Task,
replicationTasks []p.Task,
timerTasks []p.Task,
) error {
Expand All @@ -617,6 +621,8 @@ func applyTasks(
return err
}

// TODO: create cross-cluster tasks

if err := createReplicationTasks(
batch,
replicationTasks,
Expand Down
Loading

0 comments on commit bda4c5c

Please sign in to comment.