Skip to content

Commit

Permalink
Handle applyParentClose target domain failover (cadence-workflow#4533)
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender authored Oct 16, 2021
1 parent 13c6a2b commit d9e5003
Show file tree
Hide file tree
Showing 25 changed files with 393 additions and 106 deletions.
160 changes: 147 additions & 13 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/server/cadence/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *ServerSuite) TestServerStartup() {
if err != nil {
log.Fatal("Config file corrupted.", err)
}

if os.Getenv("CASSANDRA_SEEDS") == "cassandra" {
// replace local host to docker network
// this env variable value is set by buildkite's docker-compose
Expand Down
12 changes: 8 additions & 4 deletions common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ type (
VisibilityTimestamp time.Time
TaskID int64
TargetDomainID string
TargetDomainIDs map[string]struct{} // used for ApplyParentPolicy request
TargetWorkflowID string
TargetRunID string
TargetChildWorkflowOnly bool
Expand Down Expand Up @@ -585,6 +586,7 @@ type (

// ApplyParentClosePolicyTask identifies a task for applying parent close policy
ApplyParentClosePolicyTask struct {
TargetDomainIDs map[string]struct{}
VisibilityTimestamp time.Time
TaskID int64
Version int64
Expand Down Expand Up @@ -2636,17 +2638,19 @@ func (t *TransferTaskInfo) GetRunID() string {
return t.RunID
}

// GetTargetDomainIDs returns the targetDomainIDs for applyParentPolicy
func (t *TransferTaskInfo) GetTargetDomainIDs() map[string]struct{} {
return t.TargetDomainIDs
}

// GetDomainID returns the domain ID for transfer task
func (t *TransferTaskInfo) GetDomainID() string {
return t.DomainID
}

// String returns a string representation for transfer task
func (t *TransferTaskInfo) String() string {
return fmt.Sprintf(
"{DomainID: %v, WorkflowID: %v, RunID: %v, TaskID: %v, TargetDomainID: %v, TargetWorkflowID %v, TargetRunID: %v, TargetChildWorkflowOnly: %v, TaskList: %v, TaskType: %v, ScheduleID: %v, Version: %v.}",
t.DomainID, t.WorkflowID, t.RunID, t.TaskID, t.TargetDomainID, t.TargetWorkflowID, t.TargetRunID, t.TargetChildWorkflowOnly, t.TaskList, t.TaskType, t.ScheduleID, t.Version,
)
return fmt.Sprintf("%#v", t)
}

// GetTaskID returns the task ID for replication task
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/nosql/nosqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ func (d *nosqlExecutionStore) prepareCrossClusterTasksForWorkflowTxn(
var targetCluster string
targetDomainID := domainID // default to source domain, can't be empty, since empty string is not valid UUID
var targetWorkflowID string
targetDomainIDs := map[string]struct{}{}
targetRunID := p.CrossClusterTaskDefaultTargetRunID
targetChildWorkflowOnly := false
recordVisibility := false
Expand Down Expand Up @@ -353,6 +354,7 @@ func (d *nosqlExecutionStore) prepareCrossClusterTasksForWorkflowTxn(

case p.CrossClusterTaskTypeApplyParentPolicy:
targetCluster = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetCluster
targetDomainIDs = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetDomainIDs

default:
return nil, &types.InternalServiceError{
Expand All @@ -369,6 +371,7 @@ func (d *nosqlExecutionStore) prepareCrossClusterTasksForWorkflowTxn(
VisibilityTimestamp: task.GetVisibilityTimestamp(),
TaskID: task.GetTaskID(),
TargetDomainID: targetDomainID,
TargetDomainIDs: targetDomainIDs,
TargetWorkflowID: targetWorkflowID,
TargetRunID: targetRunID,
TargetChildWorkflowOnly: targetChildWorkflowOnly,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const (
`visibility_ts: ?, ` +
`task_id: ?, ` +
`target_domain_id: ?, ` +
`target_domain_ids: ?,` +
`target_workflow_id: ?, ` +
`target_run_id: ?, ` +
`target_child_workflow_only: ?, ` +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,13 @@ func parseTransferTaskInfo(
info.TaskID = v.(int64)
case "target_domain_id":
info.TargetDomainID = v.(gocql.UUID).String()
case "target_domain_ids":
targetDomainIDs := make(map[string]struct{})
dList := mustConvertToSlice(result["target_domain_ids"])
for _, v := range dList {
targetDomainIDs[v.(gocql.UUID).String()] = struct{}{}
}
info.TargetDomainIDs = targetDomainIDs
case "target_workflow_id":
info.TargetWorkflowID = v.(string)
case "target_run_id":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func (db *cdb) createTransferTasks(
task.VisibilityTimestamp,
task.TaskID,
task.TargetDomainID,
task.TargetDomainIDs,
task.TargetWorkflowID,
task.TargetRunID,
task.TargetChildWorkflowOnly,
Expand Down Expand Up @@ -424,6 +425,7 @@ func (db *cdb) createCrossClusterTasks(
task.VisibilityTimestamp,
task.TaskID,
task.TargetDomainID,
task.TargetDomainIDs,
task.TargetWorkflowID,
task.TargetRunID,
task.TargetChildWorkflowOnly,
Expand Down
17 changes: 15 additions & 2 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2333,7 +2333,16 @@ func (s *ExecutionManagerSuite) TestCrossClusterTasks() {
Version: 123,
},
}
crossClusterTasks := append(crossClusterTasks1, crossClusterTasks2)
crossClusterTasks3 := &p.CrossClusterApplyParentClosePolicyTask{
TargetCluster: remoteClusterName2,
ApplyParentClosePolicyTask: p.ApplyParentClosePolicyTask{
VisibilityTimestamp: now,
TaskID: s.GetNextSequenceNumber(),
TargetDomainIDs: map[string]struct{}{uuid.New(): {}, uuid.New(): {}},
Version: 123,
},
}
crossClusterTasks := append(crossClusterTasks1, crossClusterTasks2, crossClusterTasks3)

versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
{
Expand All @@ -2360,11 +2369,13 @@ func (s *ExecutionManagerSuite) TestCrossClusterTasks() {
// check created tasks for cluster 2
respTasks, err = s.GetCrossClusterTasks(ctx, remoteClusterName2, 0, 1, true)
s.NoError(err)
s.validateCrossClusterTasks([]p.Task{crossClusterTasks2}, respTasks)
s.validateCrossClusterTasks([]p.Task{crossClusterTasks2, crossClusterTasks3}, respTasks)

// range delete tasks for cluster 1
err = s.CompleteCrossClusterTask(ctx, remoteClusterName2, respTasks[0].TaskID)
s.NoError(err)
err = s.CompleteCrossClusterTask(ctx, remoteClusterName2, respTasks[1].TaskID)
s.NoError(err)
respTasks, err = s.GetCrossClusterTasks(ctx, remoteClusterName2, 0, 1, true)
s.NoError(err)
s.Empty(respTasks)
Expand Down Expand Up @@ -2393,6 +2404,8 @@ func (s *ExecutionManagerSuite) validateCrossClusterTasks(
s.Equal(task.TargetWorkflowID, loadedTaskInfo[index].TargetWorkflowID)
s.Equal(task.TargetRunID, loadedTaskInfo[index].TargetRunID)
s.Equal(task.TargetChildWorkflowOnly, loadedTaskInfo[index].TargetChildWorkflowOnly)
case *p.CrossClusterApplyParentClosePolicyTask:
s.Equal(task.TargetDomainIDs, loadedTaskInfo[index].GetTargetDomainIDs())
default:
s.FailNow("unknown cross cluster task type")
}
Expand Down
6 changes: 4 additions & 2 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package persistencetests

import (
"context"
"fmt"
"math"
"math/rand"
"sync/atomic"
Expand Down Expand Up @@ -1003,12 +1004,13 @@ func (s *TestBase) UpdateWorkflowExecutionWithReplication(
case *p.DecisionTask, *p.ActivityTask, *p.CloseExecutionTask, *p.CancelExecutionTask, *p.StartChildExecutionTask, *p.SignalExecutionTask,
*p.RecordWorkflowStartedTask, *p.ResetWorkflowTask, *p.UpsertWorkflowSearchAttributesTask:
transferTasks = append(transferTasks, t)
case *p.CrossClusterStartChildExecutionTask, *p.CrossClusterCancelExecutionTask, *p.CrossClusterSignalExecutionTask:
case *p.CrossClusterStartChildExecutionTask, *p.CrossClusterCancelExecutionTask, *p.CrossClusterSignalExecutionTask,
*p.CrossClusterRecordChildWorkflowExecutionCompleteTask, *p.CrossClusterApplyParentClosePolicyTask:
crossClusterTasks = append(crossClusterTasks, t)
case *p.HistoryReplicationTask, *p.SyncActivityTask:
replicationTasks = append(replicationTasks, t)
default:
panic("Unknown transfer task type.")
panic(fmt.Sprintf("Unknown transfer task type. %v", t))
}
}
for _, decisionScheduleID := range decisionScheduleIDs {
Expand Down
12 changes: 12 additions & 0 deletions common/persistence/serialization/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,18 @@ func (t *TransferTaskInfo) GetTargetDomainID() (o []byte) {
return
}

// GetTargetDomainIDs internal sql blob getter
func (t *TransferTaskInfo) GetTargetDomainIDs() (o map[string]struct{}) {
if t != nil {
targetDomainIDs := make(map[string]struct{})
for _, domainID := range t.TargetDomainIDs {
targetDomainIDs[domainID.String()] = struct{}{}
}
return targetDomainIDs
}
return
}

// GetTargetWorkflowID internal sql blob getter
func (t *TransferTaskInfo) GetTargetWorkflowID() (o string) {
if t != nil {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/serialization/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ type (
RunID UUID
TaskType int16
TargetDomainID UUID
TargetDomainIDs []UUID
TargetWorkflowID string
TargetRunID UUID
TaskList string
Expand Down
40 changes: 28 additions & 12 deletions common/persistence/serialization/thrift_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,12 +584,13 @@ func transferTaskInfoToThrift(info *TransferTaskInfo) *sqlblobs.TransferTaskInfo
if info == nil {
return nil
}
return &sqlblobs.TransferTaskInfo{
DomainID: info.DomainID,
WorkflowID: &info.WorkflowID,
RunID: info.RunID,
TaskType: &info.TaskType,
TargetDomainID: info.TargetDomainID,
thriftTaskInfo := &sqlblobs.TransferTaskInfo{
DomainID: info.DomainID,
WorkflowID: &info.WorkflowID,
RunID: info.RunID,
TaskType: &info.TaskType,
TargetDomainID: info.TargetDomainID,
// TargetDomainIDs will be assigned below
TargetWorkflowID: &info.TargetWorkflowID,
TargetRunID: info.TargetRunID,
TaskList: &info.TaskList,
Expand All @@ -598,18 +599,26 @@ func transferTaskInfoToThrift(info *TransferTaskInfo) *sqlblobs.TransferTaskInfo
Version: &info.Version,
VisibilityTimestampNanos: timeToUnixNanoPtr(info.VisibilityTimestamp),
}
if len(info.TargetDomainIDs) > 0 {
thriftTaskInfo.TargetDomainIDs = [][]byte{}
for _, domainID := range info.TargetDomainIDs {
thriftTaskInfo.TargetDomainIDs = append(thriftTaskInfo.TargetDomainIDs, domainID)
}
}
return thriftTaskInfo
}

func transferTaskInfoFromThrift(info *sqlblobs.TransferTaskInfo) *TransferTaskInfo {
if info == nil {
return nil
}
return &TransferTaskInfo{
DomainID: info.DomainID,
WorkflowID: info.GetWorkflowID(),
RunID: info.RunID,
TaskType: info.GetTaskType(),
TargetDomainID: info.TargetDomainID,
transferTaskInfo := &TransferTaskInfo{
DomainID: info.DomainID,
WorkflowID: info.GetWorkflowID(),
RunID: info.RunID,
TaskType: info.GetTaskType(),
TargetDomainID: info.TargetDomainID,
// TargetDomainIDs will be assigned below
TargetWorkflowID: info.GetTargetWorkflowID(),
TargetRunID: info.TargetRunID,
TaskList: info.GetTaskList(),
Expand All @@ -618,6 +627,13 @@ func transferTaskInfoFromThrift(info *sqlblobs.TransferTaskInfo) *TransferTaskIn
Version: info.GetVersion(),
VisibilityTimestamp: timeFromUnixNano(info.GetVisibilityTimestampNanos()),
}
if len(info.GetTargetDomainIDs()) > 0 {
transferTaskInfo.TargetDomainIDs = []UUID{}
for _, domainID := range info.GetTargetDomainIDs() {
transferTaskInfo.TargetDomainIDs = append(transferTaskInfo.TargetDomainIDs, domainID)
}
}
return transferTaskInfo
}

func crossClusterTaskInfoToThrift(info *CrossClusterTaskInfo) *sqlblobsCrossClusterTaskInfo {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/serialization/thrift_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func TestTransferTaskInfo(t *testing.T) {
RunID: UUID(uuid.New()),
TaskType: int16(rand.Intn(1000)),
TargetDomainID: UUID(uuid.New()),
TargetDomainIDs: []UUID{UUID(uuid.New()), UUID(uuid.New())},
TargetWorkflowID: "TargetWorkflowID",
TargetRunID: UUID(uuid.New()),
TaskList: "TaskList",
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/sql/sqlExecutionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ func (m *sqlExecutionStore) GetTransferTasks(
RunID: info.RunID.String(),
VisibilityTimestamp: info.GetVisibilityTimestamp(),
TargetDomainID: info.TargetDomainID.String(),
TargetDomainIDs: info.GetTargetDomainIDs(),
TargetWorkflowID: info.GetTargetWorkflowID(),
TargetRunID: info.TargetRunID.String(),
TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
Expand Down Expand Up @@ -863,6 +864,7 @@ func (m *sqlExecutionStore) GetCrossClusterTasks(
RunID: info.RunID.String(),
VisibilityTimestamp: info.GetVisibilityTimestamp(),
TargetDomainID: info.TargetDomainID.String(),
TargetDomainIDs: info.GetTargetDomainIDs(),
TargetWorkflowID: info.GetTargetWorkflowID(),
TargetRunID: info.TargetRunID.String(),
TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(),
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/sql/sqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,10 @@ func createCrossClusterTasks(
crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetCluster

case p.CrossClusterTaskTypeApplyParentPolicy:
info.TargetDomainIDs = []serialization.UUID{}
for domainID := range task.(*p.CrossClusterApplyParentClosePolicyTask).TargetDomainIDs {
info.TargetDomainIDs = append(info.TargetDomainIDs, serialization.MustParseUUID(domainID))
}
crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetCluster

default:
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from 9f6224 to 133222
23 changes: 12 additions & 11 deletions schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,21 @@ CREATE TYPE replication_state (

-- TODO: Remove fields that are left over from activity and workflow tasks.
CREATE TYPE transfer_task (
domain_id uuid, -- The domain ID that this transfer task belongs to
workflow_id text, -- The workflow ID that this transfer task belongs to
run_id uuid, -- The run ID that this transfer task belongs to
domain_id uuid, -- The domain ID that this transfer task belongs to
workflow_id text, -- The workflow ID that this transfer task belongs to
run_id uuid, -- The run ID that this transfer task belongs to
task_id bigint,
visibility_ts timestamp, -- The timestamp when the transfer task is generated
target_domain_id uuid, -- The external domain ID that this transfer task is doing work for.
target_workflow_id text, -- The external workflow ID that this transfer task is doing work for.
target_run_id uuid, -- The external run ID that this transfer task is doing work for.
target_child_workflow_only boolean, -- The whether target child workflow only.
visibility_ts timestamp, -- The timestamp when the transfer task is generated
target_domain_id uuid, -- The external domain ID that this transfer task is doing work for.
target_domain_ids set<uuid>, -- The external domain ID that this transfer task is doing work for.
target_workflow_id text, -- The external workflow ID that this transfer task is doing work for.
target_run_id uuid, -- The external run ID that this transfer task is doing work for.
target_child_workflow_only boolean, -- The whether target child workflow only.
task_list text,
type int, -- enum TaskType For local: {Decision, Activity, CloseExecution, CancelExecution, StartChildExecution, SignalExecution, RecordWorkflowStarted, ResetWorkflow, UpsertWorkflowSearchAttributes}, or for crossCluster {StartChildExecution, CancelExecution, SignalExecution}
type int, -- enum TaskType For local: {Decision, Activity, CloseExecution, CancelExecution, StartChildExecution, SignalExecution, RecordWorkflowStarted, ResetWorkflow, UpsertWorkflowSearchAttributes}, or for crossCluster {StartChildExecution, CancelExecution, SignalExecution}
schedule_id bigint,
version bigint, -- the failover version when this task is created, used to compare against the mutable state, in case the events got overwritten
record_visibility boolean, -- indicates whether or not to create a visibility record
version bigint, -- the failover version when this task is created, used to compare against the mutable state, in case the events got overwritten
record_visibility boolean, -- indicates whether or not to create a visibility record
);

CREATE TYPE replication_task (
Expand Down
8 changes: 8 additions & 0 deletions schema/cassandra/cadence/versioned/v0.33/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"CurrVersion": "0.33",
"MinCompatibleVersion": "0.33",
"Description": "Added target domain ids to the executions table",
"SchemaUpdateCqlFiles": [
"target_domain_ids.cql"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE transfer_task ADD target_domain_ids set<uuid>;
2 changes: 1 addition & 1 deletion schema/cassandra/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package cassandra
// NOTE: whenever there is a new data base schema update, plz update the following versions

// Version is the Cassandra database release version
const Version = "0.32"
const Version = "0.33"

// VisibilityVersion is the Cassandra visibility database release version
const VisibilityVersion = "0.7"
Loading

0 comments on commit d9e5003

Please sign in to comment.